设为首页 收藏本站
查看: 818|回复: 0

[经验分享] php消息队列

[复制链接]

尚未签到

发表于 2015-8-24 16:04:17 | 显示全部楼层 |阅读模式



  Memcache 一般用于缓存服务。但是很多时候,比如一个消息广播系统,需要一个消息队列。直接从数据库取消息,负载往往不行。如果将整个消息队列用一个key缓存到memcache里面。对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费 。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。
  
  php-通过共享内存实现消息队列和进程通信的两个类





<?php  
class MQ{  
public static $client;  
private static $m_real;  
private static $m_front;  
private static $m_data = array();  
const QUEUE_MAX_NUM = 100000000;  
const QUEUE_FRONT_KEY = '_queue_item_front';  
const QUEUE_REAL_KEY = '_queue_item_real';  
public static function setupMq($conf) {  
self::$client = memcache_pconnect($conf);  
self::$m_real = memcache_get(self::$client, self::QUEUE_REAL_KEY);  
self::$m_front = memcache_get(self::$client, self::QUEUE_FRONT_KEY);  
if (!isset(self::$m_real) || empty(self::$m_real)) {  
self::$real= 0;  
}  
if (!isset(self::$m_front) || empty(self::$m_front)) {  
self::$m_front = 0;  
}  
return self::$client;  
}  
public static function add($queue, $data) {  
$result = false;  
if (self::$m_real < self::QUEUE_MAX_NUM) {  
if (memcache_add(self::$client, $queue.self::$m_real, $data)) {  
self::mqRealChange();  
$result = true;  
}  
}  

return $result;  
}  
public static function get($key, $count) {  
$num = 0;  
for ($i=self::$m_front;$i<self::$m_front + $count;$i++) {  
if ($dataTmp = memcache_get(self::$client, $key.$i)) {  
self::$m_data[] = $dataTmp;  
memcache_delete(self::$client, $key.$i);  
$num++;  
}  
}  
if ($num>0) {  
self::mqFrontChange($num);  
}  
return self::$m_data;  
}  
private static function mqRealChange() {  
memcache_add(self::$client, self::QUEUE_REAL_KEY, 0);  
self::$m_real = memcache_increment(self::$client, self::QUEUE_REAL_KEY, 1);  
}  

private static function mqFrontChange($num) {  
memcache_add(self::$client, self::QUEUE_FRONT_KEY, 0);  
self::$m_front = memcache_increment(self::$client, self::QUEUE_FRONT_KEY, $num);  
}  
public static function mflush($memcache_obj) {  
memcache_flush($memcache_obj);  
}  
public static function Debug() {  
echo 'real:'.self::$m_real."<br>/r/n";  
echo 'front:'.self::$m_front."<br>/r/n";  
echo 'wait for process data:'.intval(self::$m_real - self::$m_front);  
echo "<br>/r/n";  
echo '<pre>';  
print_r(self::$m_data);  
echo '<pre>';  
}  
}  
define('FLUSH_MQ',0);//CLEAN ALL DATA  
define('IS_ADD',0);//SET DATA  
$mobj = MQ::setupMq('127.0.0.1','11211');  
if (FLUSH_MQ) {  
MQ::mflush($mobj);  
} else {  
if (IS_ADD) {  
MQ::add('user_sync', '1test');  
MQ::add('user_sync', '2test');  
MQ::add('user_sync', '3test');  
MQ::add('user_sync', '4test');  
MQ::add('user_sync', '5test');  
MQ::add('user_sync', '6test');  
} else {  
MQ::get('user_sync', 10);  
}  

}  
MQ::Debug();  
实现消息队列,可以使用比较专业的工具,例如:Apache ActiveMQ、memcacheq…..,下面是两个基本简单的实现方式:
使用memcache方法来实现
Java代码  
<?php  
/*
* memcache队列类
* 支持多进程并发写入、读取
* 边写边读,AB面轮值替换
* @author lkk/lianq.net
* @create on 9:25 2012-9-28
*
* @example:
$obj = new memcacheQueue('duilie');
$obj->add('1asdf');
$obj->getQueueLength();
$obj->read(11);
$obj->get(8);
*/  

class memcacheQueue{  
public static   $client;            //memcache客户端连接  
public          $access;            //队列是否可更新     
private         $currentSide;       //当前轮值的队列面:A/B  
private         $lastSide;          //上一轮值的队列面:A/B  
private         $sideAHead;         //A面队首值  
private         $sideATail;         //A面队尾值  
private         $sideBHead;         //B面队首值  
private         $sideBTail;         //B面队尾值  
private         $currentHead;       //当前队首值  
private         $currentTail;       //当前队尾值  
private         $lastHead;          //上轮队首值  
private         $lastTail;          //上轮队尾值   
private         $expire;            //过期时间,秒,1~2592000,即30天内;0为永不过期  
private         $sleepTime;         //等待解锁时间,微秒  
private         $queueName;         //队列名称,唯一值  
private         $retryNum;          //重试次数,= 10 * 理论并发数  

const   MAXNUM      = 2000;                 //(单面)最大队列数,建议上限10K  
const   HEAD_KEY    = '_lkkQueueHead_';     //队列首key  
const   TAIL_KEY    = '_lkkQueueTail_';     //队列尾key  
const   VALU_KEY    = '_lkkQueueValu_';     //队列值key  
const   LOCK_KEY    = '_lkkQueueLock_';     //队列锁key  
const   SIDE_KEY    = '_lkkQueueSide_';     //轮值面key  

/*
* 构造函数
* @param   [config]    array   memcache服务器参数
* @param   [queueName] string  队列名称
* @param   [expire]    string  过期时间
* @return  NULL
*/  
public function __construct($queueName ='',$expire='',$config =''){  
if(empty($config)){  
self::$client = memcache_pconnect('localhost',11211);  
}elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')  

self::$client = memcache_pconnect($config['host'],$config['port']);  
}elseif(is_string($config)){//"127.0.0.1:11211"  
$tmp = explode(':',$config);  
$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';  
$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';  
self::$client = memcache_pconnect($conf['host'],$conf['port']);      
}  
if(!self::$client) return false;  

ignore_user_abort(TRUE);//当客户断开连接,允许继续执行  
set_time_limit(0);//取消脚本执行延时上限  

$this->access = false;  
$this->sleepTime = 1000;  
$expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;  
$this->expire = $expire;  
$this->queueName = $queueName;  
$this->retryNum = 10000;  

$side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);  
$this->getHeadNTail($queueName);  
if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;  
if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;  
if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;  
if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;  
}  

/*
* 获取队列首尾值
* @param   [queueName] string  队列名称
* @return  NULL
*/  
private function getHeadNTail($queueName){  
$this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);  
$this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);  
$this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);  
$this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);  
}  

/*
* 获取当前轮值的队列面
* @return  string  队列面名称
*/  
public function getCurrentSide(){  
$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);  
if($currentSide == 'A'){  
$this->currentSide = 'A';  
$this->lastSide = 'B';     

$this->currentHead   = $this->sideAHead;  
$this->currentTail   = $this->sideATail;  
$this->lastHead      = $this->sideBHead;  
$this->lastTail      = $this->sideBTail;            
}else{  
$this->currentSide = 'B';  
$this->lastSide = 'A';  

$this->currentHead   = $this->sideBHead;  
$this->currentTail   = $this->sideBTail;  
$this->lastHead      = $this->sideAHead;  
$this->lastTail      = $this->sideATail;                        
}  

return $this->currentSide;  
}  

/*
* 队列加锁
* @return boolean
*/  
private function getLock(){  
if($this->access === false){  
while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){  
usleep($this->sleepTime);  
@$i++;  
if($i > $this->retryNum){//尝试等待N次  
return false;  
break;  
}  
}  
return $this->access = true;  
}  
return false;  
}  

/*
* 队列解锁
* @return NULL
*/  
private function unLock(){  
memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);  
$this->access = false;  
}  

/*
* 添加数据
* @param   [data]  要存储的值
* @return  boolean
*/  
public function add($data){  
$result = false;  
if(!$this->getLock()){  
return $result;  
}   
$this->getHeadNTail($this->queueName);  
$this->getCurrentSide();  

if($this->isFull()){  
$this->unLock();  
return false;  
}  

if($this->currentTail < self::MAXNUM){  
$value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;  
if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){  
$this->changeTail();  
$result = true;  
}  
}else{//当前队列已满,更换轮值面  
$this->unLock();  
$this->changeCurrentSide();  
return $this->add($data);  
}  

$this->unLock();  
return $result;  
}  

/*
* 取出数据
* @param   [length]    int 数据的长度
* @return  array
*/  
public function get($length=0){  
if(!is_numeric($length)) return false;  
if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有  
if(!$this->getLock()) return false;  

if($this->isEmpty()){  
$this->unLock();  
return false;  
}  

$keyArray   = $this->getKeyArray($length);  
$lastKey    = $keyArray['lastKey'];  
$currentKey = $keyArray['currentKey'];  
$keys       = $keyArray['keys'];  
$this->changeHead($this->lastSide,$lastKey);  
$this->changeHead($this->currentSide,$currentKey);  

$data   = @memcache_get(self::$client, $keys);  
foreach($keys as $v){//取出之后删除  

@memcache_delete(self::$client, $v, 0);  
}  
$this->unLock();  

return $data;  
}  

/*
* 读取数据
* @param   [length]    int 数据的长度
* @return  array
*/  
public function read($length=0){  
if(!is_numeric($length)) return false;  
if(empty($length)) $length = self::MAXNUM * 2;//默认读取所有  
$keyArray   = $this->getKeyArray($length);  
$data   = @memcache_get(self::$client, $keyArray['keys']);  
return $data;  
}  

/*
* 获取队列某段长度的key数组
* @param   [length]    int 队列长度
* @return  array
*/  
private function getKeyArray($length){  
$result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());  
$this->getHeadNTail($this->queueName);  
$this->getCurrentSide();  
if(empty($length)) return $result;  

//先取上一面的key  
$i = $result['lastKey'] = 0;  
for($i=0;$i<$length;$i++){  
$result['lastKey'] = $this->lastHead + $i;  
if($result['lastKey'] >= $this->lastTail) break;  
$result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];  
}  

//再取当前面的key  
$j = $length - $i;  
$k = $result['currentKey'] = 0;  
for($k=0;$k<$j;$k++){  
$result['currentKey'] = $this->currentHead + $k;  
if($result['currentKey'] >= $this->currentTail) break;  
$result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];  
}  

return $result;  
}  

/*
* 更新当前轮值面队列尾的值
* @return  NULL
*/  
private function changeTail(){  
$tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;  
memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果没有,则插入;有则false;  
//memcache_increment(self::$client, $tail_key, 1);//队列尾+1  
$v = memcache_get(self::$client, $tail_key) +1;  
memcache_set(self::$client, $tail_key,$v,false,$this->expire);  
}  

/*
* 更新队列首的值
* @param   [side]      string  要更新的面
* @param   [headValue] int     队列首的值
* @return  NULL
*/  
private function changeHead($side,$headValue){  
if($headValue < 1) return false;  
$head_key = $this->queueName .$side . self::HEAD_KEY;  
$tail_key = $this->queueName .$side . self::TAIL_KEY;  
$sideTail = memcache_get(self::$client, $tail_key);  
if($headValue < $sideTail){  
memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);  
}elseif($headValue >= $sideTail){  
$this->resetSide($side);  
}  
}  

/*
* 重置队列面,即将该队列面的队首、队尾值置为0
* @param   [side]  string  要重置的面
* @return  NULL
*/  
private function resetSide($side){  
$head_key = $this->queueName .$side . self::HEAD_KEY;  
$tail_key = $this->queueName .$side . self::TAIL_KEY;  
memcache_set(self::$client, $head_key,0,false,$this->expire);  
memcache_set(self::$client, $tail_key,0,false,$this->expire);  
}  


/*
* 改变当前轮值队列面
* @return  string
*/  
private function changeCurrentSide(){  
$currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);  
if($currentSide == 'A'){  
memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);  
$this->currentSide = 'B';  
}else{  
memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);  
$this->currentSide = 'A';  
}  
return $this->currentSide;  
}  

/*
* 检查当前队列是否已满
* @return  boolean
*/  
public function isFull(){  
$result = false;  
if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){  
$result = true;  
}  
return $result;  
}  

/*
* 检查当前队列是否为空
* @return  boolean
*/  
public function isEmpty(){  
$result = true;  
if($this->sideATail > 0 || $this->sideBTail > 0){  
$result = false;  
}  
return $result;  
}  

/*
* 获取当前队列的长度
* 该长度为理论长度,某些元素由于过期失效而丢失,真实长度小于或等于该长度
* @return  int
*/  
public function getQueueLength(){  
$this->getHeadNTail($this->queueName);  
$this->getCurrentSide();  

$sideALength = $this->sideATail - $this->sideAHead;  
$sideBLength = $this->sideBTail - $this->sideBHead;  
$result = $sideALength + $sideBLength;  

return $result;  
}  


/*
* 清空当前队列数据,仅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三个key
* @return  boolean
*/  
public function clear(){  
if(!$this->getLock()) return false;  
for($i=0;$i<self::MAXNUM;$i++){  
@memcache_delete(self::$client, $this->queueName.'A'. self::VALU_KEY .$i, 0);  
@memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);  
}  
$this->unLock();  
$this->resetSide('A');  
$this->resetSide('B');  
return true;  
}  

/*
* 清除所有memcache缓存数据
* @return  NULL
*/  
public function memFlush(){  
memcache_flush(self::$client);  
}  
}  
  
  利用PHP操作Linux消息队列完成进程间通信
  当我们开发的系统需要使用多进程方式运行时,进程间通信便成了至关重要的环节。消息队列(message queue)是Linux系统进程间通信的一种方式。
  关于Linux系统进程通信的概念及实现可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/
  关于Linux系统消息队列的概念及实现可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/part4/
  PHP的sysvmsg模块是对Linux系统支持的System V IPC中的System V消息队列函数族的封装。我们需要利用sysvmsg模块提供的函数来进进程间通信。先来看一段示例代码_1:




Java代码  <?php  
  $message_queue_key = ftok(__FILE__, 'a');  
    
  $message_queue = msg_get_queue($message_queue_key, 0666);  
  var_dump($message_queue);  
    
  $message_queue_status = msg_stat_queue($message_queue);  
  print_r($message_queue_status);  
    
  //向消息队列中写  
  msg_send($message_queue, 1, "Hello,World!");  
    
  $message_queue_status = msg_stat_queue($message_queue);  
  print_r($message_queue_status);  
    
  //从消息队列中读  
  msg_receive($message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT);  
  print_r($message."\r\n");  
    
  msg_remove_queue($message_queue);  
  ?>  

这段代码的运行结果如下:



Java代码  resource(4) of type (sysvmsg queue)  
  Array  
  (  
      [msg_perm.uid] => 1000  
      [msg_perm.gid] => 1000  
      [msg_perm.mode] => 438  
      [msg_stime] => 0  
      [msg_rtime] => 0  
      [msg_ctime] => 1279849495  
      [msg_qnum] => 0  
      [msg_qbytes] => 16384  
      [msg_lspid] => 0  
      [msg_lrpid] => 0  
  )  
  Array  
  (  
      [msg_perm.uid] => 1000  
      [msg_perm.gid] => 1000  
      [msg_perm.mode] => 438  
      [msg_stime] => 1279849495  
      [msg_rtime] => 0  
      [msg_ctime] => 1279849495  
      [msg_qnum] => 1  
      [msg_qbytes] => 16384  
      [msg_lspid] => 2184  
      [msg_lrpid] => 0  
  )  
  Hello,World!  

可以看到已成功从消息队列中读取“Hello,World!”字符串  下面列举一下示例代码中的主要函数:




Java代码  ftok ( string $pathname , string $proj )   
      手册上给出的解释是:Convert a pathname and a project identifier to a System V IPC key。这个函数返回的键值唯一对应linux系统中一个消息队列。在获得消息队列的引用之前都需要调用这个函数。  
    
  msg_get_queue ( int $key [, int $perms ] )  
      msg_get_queue()会根据传入的键值返回一个消息队列的引用。如果linux系统中没有消息队列与键值对应,msg_get_queue()将会创建一个新的消息队列。函数的第二个参数需要传入一个int值,作为新创建的消息队列的权限值,默认为0666。这个权限值与linux命令chmod中使用的数值是同一个意思,因为在linux系统中一切皆是文件。  
    
  msg_send ( resource $queue , int $msgtype , mixed $message [, bool $serialize [, bool $blocking [, int &$errorcode ]]] )  
      顾名思义,该函数用来向消息队列中写数据。  
    
  msg_stat_queue ( resource $queue )   
      这个函数会返回消息队列的元数据。消息队列元数据中的信息很完整,包括了消息队列中待读取的消息数、最后读写队列的进程ID等。示例代码在第8行调用该函数返回的数组中队列中待读取的消息数msg_qnum值为0。  
    
  msg_receive ( resource $queue , int $desiredmsgtype , int &$msgtype , int $maxsize , mixed &$message [, bool $unserialize [, int $flags [, int &$errorcode ]]] )   
      msg_receive用于读取消息队列中的数据。  
    
  msg_remove_queue ( resource $queue )   
      msg_remove_queue用于销毁一个队列。  

示例代码_1只是展示了PHP操作消息队列函数的应用。下面的代码具体描述了进程间通信的场景



Java代码  <?php  
  $message_queue_key = ftok ( __FILE__, 'a' );  
  $message_queue = msg_get_queue ( $message_queue_key, 0666 );  
    
  $pids = array ();  
  for($i = 0; $i < 5; $i ++) {  
      //创建子进程  
      $pids [$i] = pcntl_fork ();  
        
      if ($pids [$i]) {  
          echo "No.$i child process was created, the pid is $pids[$i]\r\n";  
      } elseif ($pids [$i] == 0) {  
          $pid = posix_getpid ();  
          echo "process.$pid is writing now\r\n";  
           
          msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" );  
          posix_kill ( $pid, SIGTERM );  
      }  
  }  
    
  do {  
      msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT );  
      echo $message;  
    
      //需要判断队列是否为空,如果为空就退出  
  //break;  
  } while ( true )  
  ?>  

运行结果为:



Java代码  No.0 child process was created, the pid is 5249  
  No.1 child process was created, the pid is 5250  
  No.2 child process was created, the pid is 5251  
  No.3 child process was created, the pid is 5252  
  No.4 child process was created, the pid is 5253  
  process.5251 is writing now  
  this is process.5251's data  
  process.5253 is writing now  
  process.5252 is writing now  
  process.5250 is writing now  
  this is process.5253's data  
  this is process.5252's data  
  this is process.5250's data  
  process.5249 is writing now  
  this is process.5249's data  
  redis
http://www.neatstudio.com/show-976-1.shtml
  
  php自带的三个消息队列相关的函数
http://www.zhangguangda.com/?p=89

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-103647-1-1.html 上篇帖子: PHP导出excel文件的几种方式 下篇帖子: simple_html_dom.php 使用 乱码处理<作者:gaoming13>
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表