设为首页 收藏本站
查看: 471|回复: 0

[经验分享] 工作中使用RabbitMQ

[复制链接]

尚未签到

发表于 2017-7-2 19:57:22 | 显示全部楼层 |阅读模式
  写一个基类



<?php

namespace BI\Service\RabbitMQJob;

use AMQPConnection;
use AMQPChannel;
use AMQPExchange;
use AMQPQueue;
use Exception;

abstract class Base{

     Const EXCHANGE_TYPE_DIRECCT = AMQP_EX_TYPE_DIRECT;
     Const EXCHANGE_TYPE_FANOUT  = AMQP_EX_TYPE_FANOUT;
     Const EXCHANGE_TYPE_TOPIC   = AMQP_EX_TYPE_TOPIC;
     
     Const JOB_TYPE_MAIL = 'mail';
     Const JOB_TYPE_TEST = 'test';
     Const JOB_TYPE_STRUCTURE = 'structure';

     /** @var  AMQPChannel */
     protected $channel;

     /** @var  AMQPExchange */
     protected $exchange;

     public function __construct($config, $vhost = '/')
     {
         ($config);
         $connArgs = array('host' => $host, 'port' => $port, 'login' => $login, 'password' => $password, 'vhost' => $vhost);
         
         try {
             $conn = new AMQPConnection($connArgs);
             if (!$conn->connect()) {
                 throw new Exception('Cannot connect to the broker with parameters: ' . json_encode($connArgs));
             }
             $this->channel = new AMQPChannel($conn);
             $this->declareExchange();
             $this->bindQueues();
         } catch (Exception $e) {
             throw new Exception($e->getMessage());
         }
     }

     protected function declareExchange(){
         $this->exchange = new AMQPExchange($this->channel);
         $this->exchange->setName($this->getExchangeName());
         $this->exchange->($this->getExchangeType());
         $this->exchange->setFlags(AMQP_DURABLE);
         if(!$this->exchange->declareExchange())
             throw new Exception('Exchange declare failed.');
     }

     protected function bindQueues(){
         foreach($this->getQueueNamesAndRoutingKeys() as $queueName => $routingKey){
             $queue = new AMQPQueue($this->channel);
             $queue->setName($queueName);
             $queue->setFlags(AMQP_DURABLE);
             $queue->declareQueue();
             if(!$queue->bind($this->getExchangeName(), $routingKey))
                 throw new Exception('Queue binding failed with parameters: ',
                     json_encode(array('name' => $queueName, 'routingKey' => $routingKey)));
         }
     }

     /**
      * @param mixed $content
      * @param string $routingKey
      * @return bool
      * @throws Exception
      */
     protected function send($content, $routingKey = null){
         if(!($routingKey, $this->getQueueNamesAndRoutingKeys()))
             throw new Exception('RoutingKey: ' . $routingKey
                 . ' is not found in the routing key list from the function getQueueNamesAndRoutingKeys');

         $jobType = $this->getRabbitMQJobType();
         if(!$this->validateJobType($jobType))
             throw new Exception('Invalid Job Type.');

         $message = array(
             'MType' => $jobType,
             'Content' => $content,
         );
         return $this->exchange->publish(json_encode($message), $routingKey);
     }
     
     protected function get($rk) {
         if (!($rk, $this->getQueueNamesAndRoutingKeys())) {
             throw new Exception("RoutingKey: $rk is not found");
         }
     }

     /**
      * @param string $jobType
      * @return bool
      */
     protected function validateJobType($jobType){
         return ($jobType, array(
             self::JOB_TYPE_MAIL,
             self::JOB_TYPE_TEST,
             self::JOB_TYPE_STRUCTURE,
         ));
     }

     function __destruct(){
         $this->channel->getConnection()->disconnect();
     }

     /**
      * @return string
      */
     abstract protected function getRabbitMQJobType();

     /**
      * @return string
      */
     abstract protected function getExchangeName();

     /**
      * @return string
      */
     abstract protected function getExchangeType();

     /**
      * @return array queue_name => routing_key
      */
     abstract protected function getQueueNamesAndRoutingKeys();
}
  写一个service继承基类



<?php

namespace Mission\Service;
use BI\Service\RabbitMQJob\Base;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;

class PublishToMQService extends Base
{
     private $message;
     private $logger;
     private $error;
     protected $queue = 'mission_queue';
     protected $routingKey = 'api_update_mission';

     /**
      * @return bool
      */
     public function publish()
     {
         if ( false === $this->_validation() )
             return false;

         $this->getLogger()->addDebug(__METHOD__, array('MQMessage' => $this->message));
         $this->sendToQueue($this->message, $this->queue);

         return true;
     }

     /**
      * @param array $message
      * @return $this
      */
     public function setMessage( $message = array() )
     {
         $this->message = $message;

         return $this;
     }

     /**
      * @param $queue
      * @return $this
      */
     public function setQueue( $queue )
     {
         $this->queue = $queue;

         return $this;
     }

     /**
      * @param $routingKey
      * @return $this
      */
     public function setRoutingKey( $routingKey )
     {
         $this->routingKey = $routingKey;

         return $this;
     }

     /**
      * @return Logger
      */
     private function getLogger()
     {
         if (!($this->logger instanceof Logger)) {
             $this->logger = new Logger('Detection');
             $file = __DIR__ . DIRECTORY_SEPARATOR . '../../../logs/queue.log';
             $this->logger->pushHandler(new StreamHandler( $file, Logger::DEBUG ));
         }
         return $this->logger;
     }

     /**
      * @return bool
      */
     private function _validation()
     {
         if ( empty( $this->message ) ) {
             $this->error = 'Message cannot be empty.';
             return false;
         }

         return true;
     }

     /**
      * @return string
      */
     protected function getExchangeName()
     {
         return 'API';
     }

     /**
      * @return string
      */
     protected function getRabbitMQJobType()
     {
         return Base::JOB_TYPE_TEST;
     }

     /**
      * @return string
      */
     protected function getExchangeType()
     {
         return parent::EXCHANGE_TYPE_DIRECCT;
     }

     /**
      * @return array queue_name => routing_key
      */
     protected function getQueueNamesAndRoutingKeys()
     {
         return array(
             $this->queue => $this->routingKey
         );
     }

     private function sendToQueue($content, $queueName)
     {
         $key = $this->getQueueNamesAndRoutingKeys();
         return parent::send($content, $key[$queueName]);
     }

     /**
      * @return mixed
      */
     public function getError()
     {
         return $this->error;
     }

}
  在代码层调用service



class AlarmController
{   
     const QUEUE = 'internal_message';
     const ROUTING_KEY = 'api_internal_message';
     public function checkTipAlarm()
     {
         //在线,写队列通知新站内信
         /**@var PublishToMQService $publishHandler*/
         $publishHandler = $this->get( 'mission.publish.RabbitMQ' );

         $message = array(
              'act' => self::ACT_SYSTEM_NEW_INMAIL,
              'psid' => (($this->request['to']) ? $this->request['to'] : ''),
              'uuid' => (!($this->request['to']) ? $this->request['to'] : ''),
              'data' => array(
                  'owner' => $this->uuid
              )
         );

         $publishHandler->setMessage( $message )->setRoutingKey( self::ROUTING_KEY )->setQueue( self::QUEUE );

         if ( false === $publishHandler->publish() ) {
             $this->error =  array(
                 'errorMsg' => $publishHandler->getError(),
                 'errorCode' => 1
             );
             return false;
         }
     }
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390558-1-1.html 上篇帖子: 【Ubuntu 16】安装ssh 下篇帖子: 数据结构和算法-贪婪算法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表