设为首页 收藏本站
查看: 826|回复: 0

[经验分享] 基于Redis的消息队列php-resque

[复制链接]

尚未签到

发表于 2015-8-25 04:02:11 | 显示全部楼层 |阅读模式
  转载:http://netstu.5iunix.net/archives/201305-835/
  最近的做一个短信群发的项目,需要用到消息队列。因此开始了我对消息队列选型的漫长路.
  为什么选型会纠结呢,直接使用ActiveMQ,RabittMQ,Gearman等流行的消息队列不就可以了吗? 在这个项目中,只有单台服务器,而且我采用了redis来做系统缓存,同时开启了php apc来缓存phalcon model Metadata.如果再开启其他进程,需要很合理的分配各个应用的资源,如果分配不好,极有可能产生浪费,同时还有可能造成应用性能低下.
  因此,为了实现项目结构的简单化,同时简化维护,我就打算用Redis来实现,但短信群发过程中,必须要有优先级别的,直接使用Redis的 List显然不是太合适,本着这一打算,我还专门google了一下,发现有人使用zset来模拟FIFO,实现LIST中的lpush,rpop操作。
  通过Google发现有一个基于Redis的比较不错的消息队列 Resque https://github.com/resque/resque 而此消息队列则是使用ruby语言实现的.我更不想一个项目中使用多种语言了,幸好,马上又发现了基于resque的PHP实现, php-resque https://github.com/chrisboulton/php-resque
  Resque还包括很多其他插件以及其他语言的实现,详细列表请看官方列表 https://github.com/resque/resque/wiki/Alternate-Implementations
  php-resque本身并没有包括查看消息队列status的用户界面,但我们可以通过命令行查看,同时由于php-resque的消息队列实现和resque中命名一致,可以安装ruby的web界面来进行查看.
  我一直使用的是phpredis扩展. php-resque使用Redis Client是 Credis, https://github.com/colinmollenhour/credis ,而Credis的API完全是基于phpredis来封装的,即API一样,前者是C扩展,后者是PHP实现.
  促使我最终决定使用Redis及PHP-Resque的一个最要原因是PHP-Resque内置可同时开启多个进程,对于海量数据下发来说非常必要,同时不需要自己再写代码实现多进程或多线程了.
  下面具体介绍一下PHP-Resque的安装与使用.
  PHP-Resque安装
  1.环境需求


  • PHP 5.3+
  • Redis 2.2+
  • Optional but Recommended: Composer
  2.安装Composer



$ curl -sS https://getcomposer.org/installer | php
$ mv composer.phar /usr/local/bin/composer
  3.安装PHP-Resque



git clone git://github.com/chrisboulton/php-resque.git
cd php-resque
composer install
  php-resque的队列实现
  所有的消息队列都差不多,有生产者,消费者,还有Task Server.只不过名称不同而已.
  生产者:接受用户提交信息,组成消息队列中的一个任务,并将其加入消息队列服务中.
  消费者:一般为一个守护进程,从消息队列服务器中取出任务,对任务做出相应的处理.PHP-Resque中的消息者为守护进程+Job Class .即由守护进程从消息队列中取出任务,具体的任务中在任务的生产过程中已经指定了由哪个Job Class来做具体的处理.
  Task Server: 消息队列服务器,本处指redis服务.
  具体实现
  下载安装PHP-Resque后,在php-resque/demo目录中,有类似如下几个文件(我修改测试过,和原有目录不太一样了):



[iyunv@myfc demo]# ll
total 52
-rw-r--r-- 1 root root  113 Apr 17 17:48 bad_job.php
-rw-r--r-- 1 root root  494 Apr 17 17:48 check_status.php
-rw-r--r-- 1 root root 1629 Apr 28 15:43 cli-bootstrap.php
-rw-r--r-- 1 root root  396 Mar 25 10:27 config.ini
-rw-r--r-- 1 root root    6 Apr 18 11:26 index.php
-rw-r--r-- 1 root root  680 Apr 17 17:48 init.php
-rw-r--r-- 1 root root  199 Apr 18 13:13 job.php
-rw-r--r-- 1 root root   78 Apr 17 17:48 long_job.php
-rw-r--r-- 1 root root  325 Apr 28 15:51 MtSend_Job.php
-rw-r--r-- 1 root root   97 Apr 18 12:05 php_error_jobFF.php
-rw-r--r-- 1 root root  382 Apr 18 11:43 queue.php
-rw-r--r-- 1 root root  160 Apr 28 18:09 resque.php
-rw-r--r-- 1 root root   80 Apr 18 13:22 Test_Job.php
  生成任务(创建一个Job)



// Required if redis is located elsewhere
Resque::setBackend('localhost:6379');
$args = array(
'name' => 'Chris'
);
Resque::enqueue('default', 'My_Job', $args);
  其中第一行为连接redis服务
  第二行的数组是任务的内容,可随意编写.通过Resque::enqueue 序列化后加入到消息队列中.
  最后一行的第一个参数表示 消息队列的名称(可随意标记,比如 email,log等),第二个参数表示取出任务后,由My_Job这个类来处理此条任务.
  同时需要说明的是: 在PHP-Resque中也没有数字标识的优化级,但可以通过消息队列名称来实现优化级,具体下面会讲到.
  定义Job Class



class My_Job
{
public function setUp()
{
// ... Set up environment for this job
    }
public function perform()
{
// .. Run job
    }
public function tearDown()
{
// ... Remove environment for this job
    }
}
  从上面的代码中可以看出,这是一个非常简单的PHP类文件, 方法perform是必须的,主要用来处理任务.其他两个方法可有可无,根据你自己的应用环境看是否需要.
  Demo目录中的job.php (修改过)



<?php
class PHP_Job
{
public function perform()
{
sleep(5);
// fwrite(STDOUT, 'Hello!');
$p = $this->args['name'];
echo 'ttime '.$p;
// print_r(json_decode($p));
echo "string";
}
}
?>
  从上面的例子可以看出,如果想在Job Class中读取任务的具体内容,可直接使用 $this->args['name']来进行读取.按上面的例子来说,它将输出 Chris
  测试消息队列
  (1)创建任务



[iyunv@myfc demo]# cd php-resque/demo
[iyunv@myfc demo]# php queue.php PHP_Job
Queued job bcd1cb6f39d72b9b786716f81b757370
  queue.php代码如下:



<?php
if(empty($argv[1])) {
die('Specify the name of a job to add. e.g, php queue.php PHP_Job');
}
require __DIR__ . '/init.php';
date_default_timezone_set('GMT');
Resque::setBackend('127.0.0.1:6379');
$args = array(
'time' => time(),
'array' => array(
'test' => 'test',
),
);
$jobId = Resque::enqueue('default', $argv[1], $args, true);
echo "Queued job ".$jobId."\n\n";
  此时查看Redis,会发现多了三条数据



resque:queues,  default
resque:queue:default, {"class":"PHP_Job","args":[{"time":1368167779,"array":{"test":"test"}}],"id":"875234d18af92212a7d60056daeae910"}
resque:job:875234d18af92212a7d60056daeae910:status, {"status":1,"updated":1368167779,"started":1368167779}
  注意: “,”逗号前是键名,后面是键值
  在开启守护进程以前,请确保PHP_Job存在,PHP_Job为PHP类名,而非PHP文件名. 如果PHP_Job不存在,则此条任务会处理失败.(因为根本不存在处理它的文件)
  (2)处理任务
  在demo目录中,守护进程是由目录下的resque.php来进行,代码如下:



<?php
date_default_timezone_set('GMT');
require 'bad_job.php';
require 'job.php';
require 'long_job.php';
require 'MtSend_Job.php';
require '../bin/resque';
  Job Class需要放到此文件头部引入,当然也可以使用其他方式引入. 上面代码中job.php即是类PHP_Job文件
  开启守护进程参数说明:
  在上面的代码中,最后一行引入的../bin/resque 其实也是一个php cli程序,代码如下:



#!/usr/bin/env php
<?php
// Find and initialize Composer
$files = array(
__DIR__ . '/../../vendor/autoload.php',
__DIR__ . '/../../../autoload.php',
__DIR__ . '/../../../../autoload.php',
__DIR__ . '/../vendor/autoload.php',
);
$found = false;
foreach ($files as $file) {
if (file_exists($file)) {
require_once $file;
break;
}
}
if (!class_exists('Composer\Autoload\ClassLoader', false)) {
die(
'You need to set up the project dependencies using the following commands:' . PHP_EOL .
'curl -s http://getcomposer.org/installer | php' . PHP_EOL .
'php composer.phar install' . PHP_EOL
);
}
$QUEUE = getenv('QUEUE');
if(empty($QUEUE)) {
die("Set QUEUE env var containing the list of queues to work.\n");
}
$REDIS_BACKEND = getenv('REDIS_BACKEND');
$REDIS_BACKEND_DB = getenv('REDIS_BACKEND_DB');
if(!empty($REDIS_BACKEND)) {
if (empty($REDIS_BACKEND_DB))
Resque::setBackend($REDIS_BACKEND);
else
Resque::setBackend($REDIS_BACKEND, $REDIS_BACKEND_DB);
}
$logLevel = 0;
$LOGGING = getenv('LOGGING');
$VERBOSE = getenv('VERBOSE');
$VVERBOSE = getenv('VVERBOSE');
if(!empty($LOGGING) || !empty($VERBOSE)) {
$logLevel = Resque_Worker::LOG_NORMAL;
}
else if(!empty($VVERBOSE)) {
$logLevel = Resque_Worker::LOG_VERBOSE;
}
$APP_INCLUDE = getenv('APP_INCLUDE');
if($APP_INCLUDE) {
if(!file_exists($APP_INCLUDE)) {
die('APP_INCLUDE ('.$APP_INCLUDE.") does not exist.\n");
}
require_once $APP_INCLUDE;
}
$interval = 5;
$INTERVAL = getenv('INTERVAL');
if(!empty($INTERVAL)) {
$interval = $INTERVAL;
}
$count = 1;
$COUNT = getenv('COUNT');
if(!empty($COUNT) && $COUNT > 1) {
$count = $COUNT;
}
$PREFIX = getenv('PREFIX');
if(!empty($PREFIX)) {
fwrite(STDOUT, '*** Prefix set to '.$PREFIX."\n");
Resque_Redis::prefix($PREFIX);
}
if($count > 1) {
for($i = 0; $i < $count; ++$i) {
$pid = Resque::fork();
if($pid == -1) {
die("Could not fork worker ".$i."\n");
}
// Child, start the worker
else if(!$pid) {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
$worker->work($interval);
break;
}
}
}
// Start a single worker
else {
$queues = explode(',', $QUEUE);
$worker = new Resque_Worker($queues);
$worker->logLevel = $logLevel;
$PIDFILE = getenv('PIDFILE');
if ($PIDFILE) {
file_put_contents($PIDFILE, getmypid()) or
die('Could not write PID information to ' . $PIDFILE);
}
fwrite(STDOUT, '*** Starting worker '.$worker."\n");
$worker->work($interval);
}
  从上面的代码可以看出,在命令行启动守护进程时,可以传递一些环境变量.而 ../bin/resque通过getenv函数来获取环境变量.
  支持的环境变量有:
  ========================================================================================================
  QUEUE – 這个是必要的,会決定 worker 要執行什麼任務,重要的在前,例如 QUEUE=default,notify,mail,log 。也可以設定為 QUEUE=* 表示執行所有任務。
  APP_INCLUDE – 这也可以说是必要的,因為 Resque 的 Job 都是写成PHP类,那 worker 執行的時候當然要把物件的檔案引入進來。可以設成 APP_INCLUDE=require.php 再在 require.php 中引入所有 Job 的 Class 文件即可。
  COUNT – 設定 worker 數量,預設是1 COUNT=5 。
  REDIS_BACKEND – 設定 Redis 的 ip, port。如果沒設定,預設是連 localhost:6379 。
  LOGGING, VERBOSE – 設定 log, VERBOSE=1 即可。
  VVERBOSE – 比較詳細的 log, VVERBOSE=1 debug 的時候可以開出來看。
  INTERVAL – worker 檢查 queue 的間隔,預設是五秒 INTERVAL=5 。
  PIDFILE – 如果你是開單 worker,可以指定 PIDFILE 把 pid 寫入,例如 PIDFILE=/var/run/resque.pid 。
  =============================================================================
  启动守护进程:



QUEUE=* APP_INCLUDE=require.php COUNT=5 VVERBOSE=1 php resque.php
  QUEUE=* 表明处理所有的消息队列,此时则没有优化级别.如果需要优化级别,可以按先后顺序列出,如 QUEUE=mail1,mail2,mail3
  APP_INCLUDE=require.php 文件中,可以引入其他的一些Job Class或其他辅助类,当然也可以直接在 demo/resque.php中引入
  启动守护进程后:



[iyunv@myfc demo]# QUEUE=*  COUNT=5 VVERBOSE=1 php resque.php
#!/usr/bin/env php
*** Starting worker myfc:22143:*
*** Starting worker myfc:22144:*
** [07:25:09 2013-05-10] Registered signals
*** Starting worker myfc:22145:*
** [07:25:09 2013-05-10] Registered signals
** [07:25:09 2013-05-10] Registered signals
*** Starting worker myfc:22146:*
** [07:25:09 2013-05-10] Registered signals
*** Starting worker myfc:22148:*
** [07:25:09 2013-05-10] Registered signals
[iyunv@myfc demo]# ** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Found job on default
** [07:25:09 2013-05-10] got (Job{default} | ID: 875234d18af92212a7d60056daeae910 | PHP_Job | [{"time":1368167779,"array":{"test":"test"}}])
** [07:25:09 2013-05-10] Forked 22163 at 2013-05-10 07:25:09
** [07:25:09 2013-05-10] Processing default since 2013-05-10 07:25:09
** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Sleeping for 5
** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Sleeping for 5
** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Sleeping for 5
** [07:25:09 2013-05-10] Checking default
** [07:25:09 2013-05-10] Sleeping for 5
ttime 1368167779string** [07:25:14 2013-05-10] done (Job{default} | ID: 875234d18af92212a7d60056daeae910 | PHP_Job | [{"time":1368167779,"array":{"test":"test"}}])
** [07:26:29 2013-05-10] Checking default
** [07:26:29 2013-05-10] Checking default
** [07:26:29 2013-05-10] Sleeping for 5
** [07:26:29 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Checking default
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:34 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Checking default
** [07:26:39 2013-05-10] Sleeping for 5
** [07:26:39 2013-05-10] Sleeping for 5
  可以看出,已经成功取出任务,并使用PHP_Job处理.然后守护进程一直在运行,等待处理新的任务
  此时看一下进程文件:



[iyunv@myfc sop]# ps -aux|grep php
root     22143  0.0  0.0 310700 11496 pts/2    S    15:25   0:00 php resque.php
root     22144  0.0  0.0 310700 11580 pts/2    S    15:25   0:00 php resque.php
root     22145  0.0  0.0 310700 11496 pts/2    S    15:25   0:00 php resque.php
root     22146  0.0  0.0 310700 11496 pts/2    S    15:25   0:00 php resque.php
root     22148  0.0  0.0 310700 11496 pts/2    S    15:25   0:00 php resque.php
  查看任务的状态
  在生成任务的时候,使用 $token = Resque::enqueue('default', 'My_Job', $args, true); 生成一个任务,第四个参数设置为true,会返回任务的JobID,通过JobID可以查看任务的状态.



$status = new Resque_Job_Status($token);
echo $status->get(); // Outputs the status
  在类文件Resque_Job_Status中定义了所有类型任务状态



Resque_Job_Status::STATUS_WAITING - 等待被取出处理
Resque_Job_Status::STATUS_RUNNING - 任务正在被处理
Resque_Job_Status::STATUS_FAILED - 任务处理失败
Resque_Job_Status::STATUS_COMPLETE - 任务处理完成
  事件触发接口 暂时没用到,可查看官方文档
  本来想把选型,使用,以及整合Phalcon写到一篇文章中,看来太长,还是分开吧.抽空把集成到Phalcon以及消息队列实际应用写一下.
  
  参考文档:
  http://avnpc.com/pages/run-background-task-by-php-resque
  http://blog.hsatac.net/2012/01/php-resque-introduction/
  http://kamisama.me/2012/10/12/background-jobs-with-php-and-resque-part-4-managing-worker/
  https://github.com/kamisama/Fresque
  http://stackoverflow.com/questions/11814445/what-is-the-proper-way-to-setup-and-use-php-resque

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-103707-1-1.html 上篇帖子: php快速url重写 下篇帖子: PHP判断客户端是PCweb端还是移动手机端方法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表