mrbear 发表于 2017-7-2 19:57:22

工作中使用RabbitMQ

  写一个基类



<?php

namespace BI\Service\RabbitMQJob;

use AMQPConnection;
use AMQPChannel;
use AMQPExchange;
use AMQPQueue;
use Exception;

abstract class Base{

   Const EXCHANGE_TYPE_DIRECCT = AMQP_EX_TYPE_DIRECT;
   Const EXCHANGE_TYPE_FANOUT= AMQP_EX_TYPE_FANOUT;
   Const EXCHANGE_TYPE_TOPIC   = AMQP_EX_TYPE_TOPIC;
   
   Const JOB_TYPE_MAIL = 'mail';
   Const JOB_TYPE_TEST = 'test';
   Const JOB_TYPE_STRUCTURE = 'structure';

   /** @varAMQPChannel */
   protected $channel;

   /** @varAMQPExchange */
   protected $exchange;

   public function __construct($config, $vhost = '/')
   {
         ($config);
         $connArgs = array('host' => $host, 'port' => $port, 'login' => $login, 'password' => $password, 'vhost' => $vhost);
         
         try {
             $conn = new AMQPConnection($connArgs);
             if (!$conn->connect()) {
               throw new Exception('Cannot connect to the broker with parameters: ' . json_encode($connArgs));
             }
             $this->channel = new AMQPChannel($conn);
             $this->declareExchange();
             $this->bindQueues();
         } catch (Exception $e) {
             throw new Exception($e->getMessage());
         }
   }

   protected function declareExchange(){
         $this->exchange = new AMQPExchange($this->channel);
         $this->exchange->setName($this->getExchangeName());
         $this->exchange->($this->getExchangeType());
         $this->exchange->setFlags(AMQP_DURABLE);
         if(!$this->exchange->declareExchange())
             throw new Exception('Exchange declare failed.');
   }

   protected function bindQueues(){
         foreach($this->getQueueNamesAndRoutingKeys() as $queueName => $routingKey){
             $queue = new AMQPQueue($this->channel);
             $queue->setName($queueName);
             $queue->setFlags(AMQP_DURABLE);
             $queue->declareQueue();
             if(!$queue->bind($this->getExchangeName(), $routingKey))
               throw new Exception('Queue binding failed with parameters: ',
                     json_encode(array('name' => $queueName, 'routingKey' => $routingKey)));
         }
   }

   /**
      * @param mixed $content
      * @param string $routingKey
      * @return bool
      * @throws Exception
      */
   protected function send($content, $routingKey = null){
         if(!($routingKey, $this->getQueueNamesAndRoutingKeys()))
             throw new Exception('RoutingKey: ' . $routingKey
               . ' is not found in the routing key list from the function getQueueNamesAndRoutingKeys');

         $jobType = $this->getRabbitMQJobType();
         if(!$this->validateJobType($jobType))
             throw new Exception('Invalid Job Type.');

         $message = array(
             'MType' => $jobType,
             'Content' => $content,
         );
         return $this->exchange->publish(json_encode($message), $routingKey);
   }
   
   protected function get($rk) {
         if (!($rk, $this->getQueueNamesAndRoutingKeys())) {
             throw new Exception("RoutingKey: $rk is not found");
         }
   }

   /**
      * @param string $jobType
      * @return bool
      */
   protected function validateJobType($jobType){
         return ($jobType, array(
             self::JOB_TYPE_MAIL,
             self::JOB_TYPE_TEST,
             self::JOB_TYPE_STRUCTURE,
         ));
   }

   function __destruct(){
         $this->channel->getConnection()->disconnect();
   }

   /**
      * @return string
      */
   abstract protected function getRabbitMQJobType();

   /**
      * @return string
      */
   abstract protected function getExchangeName();

   /**
      * @return string
      */
   abstract protected function getExchangeType();

   /**
      * @return array queue_name => routing_key
      */
   abstract protected function getQueueNamesAndRoutingKeys();
}
  写一个service继承基类



<?php

namespace Mission\Service;
use BI\Service\RabbitMQJob\Base;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;

class PublishToMQService extends Base
{
   private $message;
   private $logger;
   private $error;
   protected $queue = 'mission_queue';
   protected $routingKey = 'api_update_mission';

   /**
      * @return bool
      */
   public function publish()
   {
         if ( false === $this->_validation() )
             return false;

         $this->getLogger()->addDebug(__METHOD__, array('MQMessage' => $this->message));
         $this->sendToQueue($this->message, $this->queue);

         return true;
   }

   /**
      * @param array $message
      * @return $this
      */
   public function setMessage( $message = array() )
   {
         $this->message = $message;

         return $this;
   }

   /**
      * @param $queue
      * @return $this
      */
   public function setQueue( $queue )
   {
         $this->queue = $queue;

         return $this;
   }

   /**
      * @param $routingKey
      * @return $this
      */
   public function setRoutingKey( $routingKey )
   {
         $this->routingKey = $routingKey;

         return $this;
   }

   /**
      * @return Logger
      */
   private function getLogger()
   {
         if (!($this->logger instanceof Logger)) {
             $this->logger = new Logger('Detection');
             $file = __DIR__ . DIRECTORY_SEPARATOR . '../../../logs/queue.log';
             $this->logger->pushHandler(new StreamHandler( $file, Logger::DEBUG ));
         }
         return $this->logger;
   }

   /**
      * @return bool
      */
   private function _validation()
   {
         if ( empty( $this->message ) ) {
             $this->error = 'Message cannot be empty.';
             return false;
         }

         return true;
   }

   /**
      * @return string
      */
   protected function getExchangeName()
   {
         return 'API';
   }

   /**
      * @return string
      */
   protected function getRabbitMQJobType()
   {
         return Base::JOB_TYPE_TEST;
   }

   /**
      * @return string
      */
   protected function getExchangeType()
   {
         return parent::EXCHANGE_TYPE_DIRECCT;
   }

   /**
      * @return array queue_name => routing_key
      */
   protected function getQueueNamesAndRoutingKeys()
   {
         return array(
             $this->queue => $this->routingKey
         );
   }

   private function sendToQueue($content, $queueName)
   {
         $key = $this->getQueueNamesAndRoutingKeys();
         return parent::send($content, $key[$queueName]);
   }

   /**
      * @return mixed
      */
   public function getError()
   {
         return $this->error;
   }

}
  在代码层调用service



class AlarmController
{   
   const QUEUE = 'internal_message';
   const ROUTING_KEY = 'api_internal_message';
   public function checkTipAlarm()
   {
         //在线,写队列通知新站内信
         /**@var PublishToMQService $publishHandler*/
         $publishHandler = $this->get( 'mission.publish.RabbitMQ' );

         $message = array(
            'act' => self::ACT_SYSTEM_NEW_INMAIL,
            'psid' => (($this->request['to']) ? $this->request['to'] : ''),
            'uuid' => (!($this->request['to']) ? $this->request['to'] : ''),
            'data' => array(
                  'owner' => $this->uuid
            )
         );

         $publishHandler->setMessage( $message )->setRoutingKey( self::ROUTING_KEY )->setQueue( self::QUEUE );

         if ( false === $publishHandler->publish() ) {
             $this->error =array(
               'errorMsg' => $publishHandler->getError(),
               'errorCode' => 1
             );
             return false;
         }
   }
}
页: [1]
查看完整版本: 工作中使用RabbitMQ