设为首页 收藏本站
查看: 421|回复: 0

[经验分享] PHP并发 任务队列(2)

[复制链接]

尚未签到

发表于 2017-3-27 08:18:09 | 显示全部楼层 |阅读模式
  例如:邮件队列、任务队列、消息队列、Feed队列 ,手机短信,纪录日志,转换视频格式,数据挖掘采集为了增强用户体验可以用队列或异步处理
  PHP短耗时异步处理

<?php
echo '执行完毕提示';
fastcgi_finish_request(); /* 响应完成, 关闭连接 */
echo '不会输出';
sleep(3); //sleep模拟一些耗时的操作
/* 记录日志 */
file_put_contents('log.txt', '生存还是毁灭,这是个问题.');
?>
  这个部署有一个好处,就是可以不至于让一个页面的响应时间太久,直接返回给用户一个执行完毕的提示,但PHP后台实际还在继续执行,这样做可能会有用户体验上的舒服感。


  PHP 长耗时用队列
  http://blog.s135.com/httpsqs/
  HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务,使用 Tokyo Cabinet 的 B+Tree Key/Value 数据库来做数据的持久化存储。
  非常简单,基于 HTTP GET/POST 协议。PHP、Java、Perl、Shell、Python、Ruby等支持HTTP协议的编程语言均可调用
  https://git.oschina.net/664712890/PHP-Redis-Queue/tree/master
  内存队列

<?php  
/**  
* 使用共享内存的PHP循环内存队列实现  
* 支持多进程, 支持各种数据类型的存储  
* 注: 完成入队或出队操作,尽快使用unset(), 以释放临界区  
*  
* @author wangbinandi@gmail.com  
* @created 2009-12-23  
*/  
class SHMQueue  
{  
private $maxQSize = 0; // 队列最大长度  
private $front = 0; // 队头指针  
private $rear = 0;  // 队尾指针  
private $blockSize = 256;  // 块的大小(byte)  
private $memSize = 25600;  // 最大共享内存(byte)  
private $shmId = 0;  
private $filePtr = './shmq.ptr';  
private $semId = 0;  
public function __construct()  
{         
$shmkey = ftok(__FILE__, 't');  
$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );  
$this->maxQSize = $this->memSize / $this->blockSize;  
// 申請一个信号量  
$this->semId = sem_get($shmkey, 1);  
sem_acquire($this->semId); // 申请进入临界区         
$this->init();  
}  
private function init()  
{  
if ( file_exists($this->filePtr) ){  
$contents = file_get_contents($this->filePtr);  
$data = explode( '|', $contents );  
if ( isset($data[0]) && isset($data[1])){  
$this->front = (int)$data[0];  
$this->rear  = (int)$data[1];  
}  
}  
}  
public function getLength()  
{  
return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;  
}  
public function enQueue( $value )  
{  
if ( $this->ptrInc($this->rear) == $this->front ){ // 队满  
return false;  
}  
$data = $this->encode($value);  
shmop_write($this->shmId, $data, $this->rear );  
$this->rear = $this->ptrInc($this->rear);  
return true;  
}  
public function deQueue()  
{  
if ( $this->front == $this->rear ){ // 队空  
return false;  
}  
$value = shmop_read($this->shmId, $this->front, $this->blockSize-1);  
$this->front = $this->ptrInc($this->front);  
return $this->decode($value);  
}  
private function ptrInc( $ptr )  
{  
return ($ptr + $this->blockSize) % ($this->memSize);  
}  
private function encode( $value )  
{  
$data = serialize($value) . "__eof";  
if ( strlen($data) > $this->blockSize -1 ){  
throw new Exception(strlen($data)." is overload block size!");  
}  
return $data;  
}  
private function decode( $value )  
{  
$data = explode("__eof", $value);  
return unserialize($data[0]);         
}  
public function __destruct()  
{  
$data = $this->front . '|' . $this->rear;  
file_put_contents($this->filePtr, $data);  
sem_release($this->semId); // 出临界区, 释放信号量  
}  
}  
//使用的样例代码如下:
// 进队操作
$shmq = new SHMQueue();  
$data = 'test data';  
$shmq->enQueue($data);
$data = 'test';  
$shmq->enQueue($data);  
unset($shmq);  
// 出队操作  
$shmq = new SHMQueue();  
$data = $shmq->deQueue();  
unset($shmq);
memcache队列 可以使用比较专业的工具,例如:Apache ActiveMQ、memcacheq,下面是两个基本简单的实现方式:
<?php
/**
* PHP memcache 队列类
* @author LKK/lianq.net
* @version 0.3
* @修改说明:
* 1.放弃了之前的AB面轮值思路,使用类似数组的构造,重写了此类.
* 2.队列默认先进先出,但增加了反向读取功能.
* @example:
$obj = new memcacheQueue('duilie');
for ($i = 1; $i <= 10; $i++) {
$obj->add($i . 'asdf');
}
$count = $obj->getQueueLength();
$list = $obj->read(10); //读数据10条,没有出栈
$poplist = $obj->get(8); //出栈
*/
class memcacheQueue
{
public static $client; //memcache客户端连接
public $access; //队列是否可更新
private $expire; //过期时间,秒,1~2592000,即30天内
private $sleepTime; //等待解锁时间,微秒
private $queueName; //队列名称,唯一值
private $retryNum; //重试次数,= 10 * 理论并发数
public $currentHead; //当前队首值
public $currentTail; //当前队尾值
const MAXNUM = 20000; //最大队列数,建议上限10K
const HEAD_KEY = '_lkkQueueHead_'; //队列首key
const TAIL_KEY = '_lkkQueueTail_'; //队列尾key
const VALU_KEY = '_lkkQueueValu_'; //队列值key
const LOCK_KEY = '_lkkQueueLock_'; //队列锁key
/**
* 构造函数
* @param string $queueName 队列名称
* @param int $expire 过期时间
* @param array $config memcache配置
*
* @return <type>
*/
public function __construct($queueName = '', $expire = 0, $config = '')
{
if (empty($config)) {
self::$client = memcache_pconnect('127.0.0.1', 11211);
} elseif (is_array($config)) { //array('host'=>'127.0.0.1','port'=>'11211')
self::$client = memcache_pconnect($config['host'], $config['port']);
} elseif (is_string($config)) { //"127.0.0.1:11211"
$tmp = explode(':', $config);
$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
self::$client = memcache_pconnect($conf['host'], $conf['port']);
}
if (!self::$client) return false;
ignore_user_abort(true); //当客户断开连接,允许继续执行
set_time_limit(0); //取消脚本执行延时上限
$this->access = false;
$this->sleepTime = 1000;
$expire = empty($expire) ? 3600 : intval($expire) + 1;
$this->expire = $expire;
$this->queueName = $queueName;
$this->retryNum = 1000;
$this->head_key = $this->queueName . self::HEAD_KEY;
$this->tail_key = $this->queueName . self::TAIL_KEY;
$this->lock_key = $this->queueName . self::LOCK_KEY;
$this->_initSetHeadNTail();
}
/**
* 初始化设置队列首尾值
*/
private function _initSetHeadNTail()
{
//当前队列首的数值
$this->currentHead = memcache_get(self::$client, $this->head_key);
if ($this->currentHead === false) $this->currentHead = 0;
//当前队列尾的数值
$this->currentTail = memcache_get(self::$client, $this->tail_key);
if ($this->currentTail === false) $this->currentTail = 0;
}
/**
* 当取出元素时,改变队列首的数值
* @param int $step 步长值
*/
private function _changeHead($step = 1)
{
$this->currentHead += $step;
memcache_set(self::$client, $this->head_key, $this->currentHead, false, $this->expire);
}
/**
* 当添加元素时,改变队列尾的数值
* @param int $step 步长值
* @param bool $reverse 是否反向
* @return null
*/
private function _changeTail($step = 1, $reverse = false)
{
if (!$reverse) {
$this->currentTail += $step;
} else {
$this->currentTail -= $step;
}
memcache_set(self::$client, $this->tail_key, $this->currentTail, false, $this->expire);
}
/**
* 队列是否为空
* @return bool
*/
private function _isEmpty()
{
return (bool)($this->currentHead === $this->currentTail);
}
/**
* 队列是否已满
* @return bool
*/
private function _isFull()
{
$len = $this->currentTail - $this->currentHead;
return (bool)($len === self::MAXNUM);
}
/**
* 队列加锁
*/
private function _getLock()
{
if ($this->access === false) {
while (!memcache_add(self::$client, $this->lock_key, 1, false, $this->expire)) {
usleep($this->sleepTime);
@$i++;
if ($i > $this->retryNum) { //尝试等待N次
return false;
break;
}
}
$this->_initSetHeadNTail();
return $this->access = true;
}
return $this->access;
}
/**
* 队列解锁
*/
private function _unLock()
{
memcache_delete(self::$client, $this->lock_key, 0);
$this->access = false;
}
/**
* 获取当前队列的长度
* 该长度为理论长度,某些元素由于过期失效而丢失,真实长度<=该长度
* @return int
*/
public function getQueueLength()
{
$this->_initSetHeadNTail();
return intval($this->currentTail - $this->currentHead);
}
/**
* 添加队列数据
* @param void $data 要添加的数据
* @return bool
*/
public function add($data)
{
if (!$this->_getLock()) return false;
if ($this->_isFull()) {
$this->_unLock();
return false;
}
$value_key = $this->queueName . self::VALU_KEY . strval($this->currentTail + 1);
$result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire);
if ($result) {
$this->_changeTail();
}
$this->_unLock();
return $result;
}
/**
* 读取队列数据
* @param int $length 要读取的长度(反向读取使用负数)
* @return array
*/
public function read($length = 0)
{
if (!is_numeric($length)) return false;
$this->_initSetHeadNTail();
if ($this->_isEmpty()) {
return false;
}
if (empty($length)) $length = self::MAXNUM; //默认所有
$keyArr = array();
if ($length > 0) { //正向读取(从队列首向队列尾)
$tmpMin = $this->currentHead;
$tmpMax = $tmpMin + $length;
for ($i = $tmpMin; $i <= $tmpMax; $i++) {
$keyArr[] = $this->queueName . self::VALU_KEY . $i;
}
} else { //反向读取(从队列尾向队列首)
$tmpMax = $this->currentTail;
$tmpMin = $tmpMax + $length;
for ($i = $tmpMax; $i > $tmpMin; $i--) {
$keyArr[] = $this->queueName . self::VALU_KEY . $i;
}
}
$result = @memcache_get(self::$client, $keyArr);
return $result;
}
/**
* 取出队列数据
* @param int $length 要取出的长度(反向读取使用负数)
* @return array
*/
public function get($length = 0)
{
if (!is_numeric($length)) return false;
if (!$this->_getLock()) return false;
if ($this->_isEmpty()) {
$this->_unLock();
return false;
}
if (empty($length)) $length = self::MAXNUM; //默认所有
$length = intval($length);
$keyArr = array();
if ($length > 0) { //正向读取(从队列首向队列尾)
$tmpMin = $this->currentHead;
$tmpMax = $tmpMin + $length;
for ($i = $tmpMin; $i <= $tmpMax; $i++) {
$keyArr[] = $this->queueName . self::VALU_KEY . $i;
}
$this->_changeHead($length);
} else { //反向读取(从队列尾向队列首)
$tmpMax = $this->currentTail;
$tmpMin = $tmpMax + $length;
for ($i = $tmpMax; $i > $tmpMin; $i--) {
$keyArr[] = $this->queueName . self::VALU_KEY . $i;
}
$this->_changeTail(abs($length), true);
}
$result = @memcache_get(self::$client, $keyArr);
foreach ($keyArr as $v) { //取出之后删除
@memcache_delete(self::$client, $v, 0);
}
$this->_unLock();
return $result;
}
/**
* 清空队列
*/
public function clear()
{
if (!$this->_getLock()) return false;
if ($this->_isEmpty()) {
$this->_unLock();
return false;
}
$tmpMin = $this->currentHead--;
$tmpMax = $this->currentTail++;
for ($i = $tmpMin; $i <= $tmpMax; $i++) {
$tmpKey = $this->queueName . self::VALU_KEY . $i;
@memcache_delete(self::$client, $tmpKey, 0);
}
$this->currentTail = $this->currentHead = 0;
memcache_set(self::$client, $this->head_key, $this->currentHead, false, $this->expire);
memcache_set(self::$client, $this->tail_key, $this->currentTail, false, $this->expire);
$this->_unLock();
}
/*
* 清除所有memcache缓存数据
*/
public function memFlush()
{
memcache_flush(self::$client);
}
}





 

 

 

 

 

 

 

 

 

 

 

 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-355843-1-1.html 上篇帖子: (转)PHP缓存的实现 下篇帖子: qq 登录api php类
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表