工作中使用RabbitMQ
写一个基类<?php
namespace BI\Service\RabbitMQJob;
use AMQPConnection;
use AMQPChannel;
use AMQPExchange;
use AMQPQueue;
use Exception;
abstract class Base{
Const EXCHANGE_TYPE_DIRECCT = AMQP_EX_TYPE_DIRECT;
Const EXCHANGE_TYPE_FANOUT= AMQP_EX_TYPE_FANOUT;
Const EXCHANGE_TYPE_TOPIC = AMQP_EX_TYPE_TOPIC;
Const JOB_TYPE_MAIL = 'mail';
Const JOB_TYPE_TEST = 'test';
Const JOB_TYPE_STRUCTURE = 'structure';
/** @varAMQPChannel */
protected $channel;
/** @varAMQPExchange */
protected $exchange;
public function __construct($config, $vhost = '/')
{
($config);
$connArgs = array('host' => $host, 'port' => $port, 'login' => $login, 'password' => $password, 'vhost' => $vhost);
try {
$conn = new AMQPConnection($connArgs);
if (!$conn->connect()) {
throw new Exception('Cannot connect to the broker with parameters: ' . json_encode($connArgs));
}
$this->channel = new AMQPChannel($conn);
$this->declareExchange();
$this->bindQueues();
} catch (Exception $e) {
throw new Exception($e->getMessage());
}
}
protected function declareExchange(){
$this->exchange = new AMQPExchange($this->channel);
$this->exchange->setName($this->getExchangeName());
$this->exchange->($this->getExchangeType());
$this->exchange->setFlags(AMQP_DURABLE);
if(!$this->exchange->declareExchange())
throw new Exception('Exchange declare failed.');
}
protected function bindQueues(){
foreach($this->getQueueNamesAndRoutingKeys() as $queueName => $routingKey){
$queue = new AMQPQueue($this->channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
if(!$queue->bind($this->getExchangeName(), $routingKey))
throw new Exception('Queue binding failed with parameters: ',
json_encode(array('name' => $queueName, 'routingKey' => $routingKey)));
}
}
/**
* @param mixed $content
* @param string $routingKey
* @return bool
* @throws Exception
*/
protected function send($content, $routingKey = null){
if(!($routingKey, $this->getQueueNamesAndRoutingKeys()))
throw new Exception('RoutingKey: ' . $routingKey
. ' is not found in the routing key list from the function getQueueNamesAndRoutingKeys');
$jobType = $this->getRabbitMQJobType();
if(!$this->validateJobType($jobType))
throw new Exception('Invalid Job Type.');
$message = array(
'MType' => $jobType,
'Content' => $content,
);
return $this->exchange->publish(json_encode($message), $routingKey);
}
protected function get($rk) {
if (!($rk, $this->getQueueNamesAndRoutingKeys())) {
throw new Exception("RoutingKey: $rk is not found");
}
}
/**
* @param string $jobType
* @return bool
*/
protected function validateJobType($jobType){
return ($jobType, array(
self::JOB_TYPE_MAIL,
self::JOB_TYPE_TEST,
self::JOB_TYPE_STRUCTURE,
));
}
function __destruct(){
$this->channel->getConnection()->disconnect();
}
/**
* @return string
*/
abstract protected function getRabbitMQJobType();
/**
* @return string
*/
abstract protected function getExchangeName();
/**
* @return string
*/
abstract protected function getExchangeType();
/**
* @return array queue_name => routing_key
*/
abstract protected function getQueueNamesAndRoutingKeys();
}
写一个service继承基类
<?php
namespace Mission\Service;
use BI\Service\RabbitMQJob\Base;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
class PublishToMQService extends Base
{
private $message;
private $logger;
private $error;
protected $queue = 'mission_queue';
protected $routingKey = 'api_update_mission';
/**
* @return bool
*/
public function publish()
{
if ( false === $this->_validation() )
return false;
$this->getLogger()->addDebug(__METHOD__, array('MQMessage' => $this->message));
$this->sendToQueue($this->message, $this->queue);
return true;
}
/**
* @param array $message
* @return $this
*/
public function setMessage( $message = array() )
{
$this->message = $message;
return $this;
}
/**
* @param $queue
* @return $this
*/
public function setQueue( $queue )
{
$this->queue = $queue;
return $this;
}
/**
* @param $routingKey
* @return $this
*/
public function setRoutingKey( $routingKey )
{
$this->routingKey = $routingKey;
return $this;
}
/**
* @return Logger
*/
private function getLogger()
{
if (!($this->logger instanceof Logger)) {
$this->logger = new Logger('Detection');
$file = __DIR__ . DIRECTORY_SEPARATOR . '../../../logs/queue.log';
$this->logger->pushHandler(new StreamHandler( $file, Logger::DEBUG ));
}
return $this->logger;
}
/**
* @return bool
*/
private function _validation()
{
if ( empty( $this->message ) ) {
$this->error = 'Message cannot be empty.';
return false;
}
return true;
}
/**
* @return string
*/
protected function getExchangeName()
{
return 'API';
}
/**
* @return string
*/
protected function getRabbitMQJobType()
{
return Base::JOB_TYPE_TEST;
}
/**
* @return string
*/
protected function getExchangeType()
{
return parent::EXCHANGE_TYPE_DIRECCT;
}
/**
* @return array queue_name => routing_key
*/
protected function getQueueNamesAndRoutingKeys()
{
return array(
$this->queue => $this->routingKey
);
}
private function sendToQueue($content, $queueName)
{
$key = $this->getQueueNamesAndRoutingKeys();
return parent::send($content, $key[$queueName]);
}
/**
* @return mixed
*/
public function getError()
{
return $this->error;
}
}
在代码层调用service
class AlarmController
{
const QUEUE = 'internal_message';
const ROUTING_KEY = 'api_internal_message';
public function checkTipAlarm()
{
//在线,写队列通知新站内信
/**@var PublishToMQService $publishHandler*/
$publishHandler = $this->get( 'mission.publish.RabbitMQ' );
$message = array(
'act' => self::ACT_SYSTEM_NEW_INMAIL,
'psid' => (($this->request['to']) ? $this->request['to'] : ''),
'uuid' => (!($this->request['to']) ? $this->request['to'] : ''),
'data' => array(
'owner' => $this->uuid
)
);
$publishHandler->setMessage( $message )->setRoutingKey( self::ROUTING_KEY )->setQueue( self::QUEUE );
if ( false === $publishHandler->publish() ) {
$this->error =array(
'errorMsg' => $publishHandler->getError(),
'errorCode' => 1
);
return false;
}
}
}
页:
[1]