diaoyudao 发表于 2018-12-15 12:43:34

RabbitMQ入门(PHP语言描述)

  二 Work queues
  生产者:
  /*
  * php G:\wamp\www\mygedu\yii tools/new-task msg
  */
  public function actionNewTask($data='Hello World!'){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->queue_declare('my_task_queue', false, true, false, false);
  $msg = new AMQPMessage($data,
  array('delivery_mode' => 2) # make message persistent
  );
  $channel->basic_publish($msg, '', 'my_task_queue');
  echo " Sent ", $data, "\n";
  $channel->close();
  $connection->close();
  }
  消费者:
  /*
  * php G:\wamp\www\mygedu\yii tools/worker
  */
  public function actionWorker(){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->queue_declare('my_task_queue', false, true, false, false);
  echo '
[*] Waiting for messages. To exit press CTRL+C', "\n";
  $callback = function($msg){
  echo " Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  };
  $channel->basic_qos(null, 1, null);
  $channel->basic_consume('my_task_queue', '', false, false, false, false, $callback);
  while(count($channel->callbacks)) {
  $channel->wait();
  }
  $channel->close();
  $connection->close();
  }
  三 Publish/Subscribe
  生产者:
  /*
  * php G:\wamp\www\mygedu\yii tools/emit-log msg
  */
  public function actionEmitLog($data='Hello World!'){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->exchange_declare('logs', 'fanout', false, false, false);
  if(empty($data)) $data = "info: Hello World!";
  $msg = new AMQPMessage($data);
  $channel->basic_publish($msg, 'logs');
  echo " Sent ", $data, "\n";
  $channel->close();
  $connection->close();
  }
  消费者:
  /*
  * php G:\wamp\www\mygedu\yii tools/receive-logs
  */
  public function actionReceiveLogs(){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->exchange_declare('logs', 'fanout', false, false, false);
  list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
  $channel->queue_bind($queue_name, 'logs');
  echo '
[*] Waiting for logs. To exit press CTRL+C', "\n";
  $callback = function($msg){
  echo ' ', $msg->body, "\n";
  };
  $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
  while(count($channel->callbacks)) {
  $channel->wait();
  }
  $channel->close();
  $connection->close();
  }
  四 Routing
  生产者:
  /*
  * php G:\wamp\www\mygedu\yii tools/emit-log-direct info msg
  */
  public function actionEmitLogDirect($argv, $data='Hello World!'){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->exchange_declare('direct_logs', 'direct', false, false, false);
  $severity = isset($argv) && !empty($argv) ? $argv : 'info';
  $msg = new AMQPMessage($data);
  $channel->basic_publish($msg, 'direct_logs', $severity);
  echo " Sent ",$severity,':',$data," \n";
  $channel->close();
  $connection->close();
  }
  消费者:
  /*
  * php G:\wamp\www\mygedu\yii tools/receive-logs-direct info,warning,error
  */
  public function actionReceiveLogsDirect($argv){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->exchange_declare('direct_logs', 'direct', false, false, false);
  list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
  $severities = explode(',', $argv);
  if(empty($severities)) {
  file_put_contents('php://stderr', "Usage: $argv \n");
  exit(1);
  }
  foreach($severities as $severity) {
  $channel->queue_bind($queue_name, 'direct_logs', $severity);
  }
  echo '
[*] Waiting for logs. To exit press CTRL+C', "\n";
  $callback = function($msg){
  echo ' ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
  };
  $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
  while(count($channel->callbacks)) {
  $channel->wait();
  }
  $channel->close();
  $connection->close();
  }
  五 Topics
  生产者:
  /*
  * php G:\wamp\www\mygedu\yii tools/topics-emit-log-direct info msg
  */
  public function actionTopicsEmitLogDirect($routing_key='kern.critical', $data='Hello World!'){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->exchange_declare('topic_logs', 'topic', false, false, false);
  $msg = new AMQPMessage($data);
  $channel->basic_publish($msg, 'topic_logs', $routing_key);
  echo " Sent ",$routing_key,':',$data," \n";
  $channel->close();
  $connection->close();
  }
  消费者:
  /*
  * php G:\wamp\www\mygedu\yii tools/topics-receive-logs-direct info,warning,error
  */
  public function actionTopicsReceiveLogsDirect($binding_keys=''){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->exchange_declare('topic_logs', 'topic', false, false, false);
  list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
  $binding_keys = explode(',', $binding_keys);
  if( empty($binding_keys )) {
  file_put_contents('php://stderr', "Usage: $binding_keys\n");
  exit(1);
  }
  foreach($binding_keys as $binding_key) {
  $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
  }
  echo '
[*] Waiting for logs. To exit press CTRL+C', "\n";
  $callback = function($msg){
  echo ' ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
  };
  $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
  while(count($channel->callbacks)) {
  $channel->wait();
  }
  $channel->close();
  $connection->close();
  }
  六 RPC
  生产者:
  /*
  * php G:\wamp\www\mygedu\yii tools/rpc-client 10
  */
  public function actionRpcClient($fib=10){
  $fibonacci_rpc = new FibonacciRpcClient();
  $response = $fibonacci_rpc->call($fib);
  echo " [.] Got ", $response, "\n";
  }
  消费者:
  /*
  * php G:\wamp\www\mygedu\yii tools/rpc-server
  */
  public function actionRpcServer($routing_key='kern.critical', $data='Hello World!'){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->queue_declare('rpc_queue', false, false, false, false);
  function fib($n) {
  if ($n == 0)
  return 0;
  if ($n == 1)
  return 1;
  return fib($n-1) + fib($n-2);
  }
  echo " Awaiting RPC requests\n";
  $callback = function($req) {
  $n = intval($req->body);
  echo " [.] fib(", $n, ")\n";
  $msg = new AMQPMessage(
  (string) fib($n),
  array('correlation_id' => $req->get('correlation_id'))
  );
  $req->delivery_info['channel']->basic_publish(
  $msg, '', $req->get('reply_to'));
  $req->delivery_info['channel']->basic_ack(
  $req->delivery_info['delivery_tag']);
  };
  $channel->basic_qos(null, 1, null);
  $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
  while(count($channel->callbacks)) {
  $channel->wait();
  }
  $channel->close();
  $connection->close();
  }
  相关类:
  class FibonacciRpcClient {
  private $connection;
  private $channel;
  private $callback_queue;
  private $response;
  private $corr_id;
  public function __construct() {
  $this->connection = new AMQPStreamConnection(
  'localhost', 5672, 'guest', 'guest');
  $this->channel = $this->connection->channel();
  list($this->callback_queue, ,) = $this->channel->queue_declare(
  "", false, false, true, false);
  $this->channel->basic_consume(
  $this->callback_queue, '', false, false, false, false,
  array($this, 'on_response'));
  }
  public function on_response($rep) {
  if($rep->get('correlation_id') == $this->corr_id) {
  $this->response = $rep->body;
  }
  }
  public function call($n) {
  $this->response = null;
  $this->corr_id = uniqid();
  $msg = new AMQPMessage(
  (string) $n,
  array('correlation_id' => $this->corr_id,
  'reply_to' => $this->callback_queue)
  );
  $this->channel->basic_publish($msg, '', 'rpc_queue');
  while(!$this->response) {
  $this->channel->wait();
  }
  return intval($this->response);
  }
  }

页: [1]
查看完整版本: RabbitMQ入门(PHP语言描述)