设为首页 收藏本站
查看: 513|回复: 0

[经验分享] RabbitMQ入门(PHP语言描述)

[复制链接]

尚未签到

发表于 2018-12-15 12:43:34 | 显示全部楼层 |阅读模式
  二 Work queues
  生产者:
  /*
  * php G:\wamp\www\mygedu\yii tools/new-task msg
  */
  public function actionNewTask($data='Hello World!'){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->queue_declare('my_task_queue', false, true, false, false);
  $msg = new AMQPMessage($data,
  array('delivery_mode' => 2) # make message persistent
  );
  $channel->basic_publish($msg, '', 'my_task_queue');
  echo " [x] Sent ", $data, "\n";
  $channel->close();
  $connection->close();
  }
  消费者:
  /*
  * php G:\wamp\www\mygedu\yii tools/worker
  */
  public function actionWorker(){
  $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
  $channel = $connection->channel();
  $channel->queue_declare('my_task_queue', false, true, false, false);
  echo '
  • Waiting for messages. To exit press CTRL+C', "\n";
      $callback = function($msg){
      echo " [x] Received ", $msg->body, "\n";
      sleep(substr_count($msg->body, '.'));
      echo " [x] Done", "\n";
      $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
      };
      $channel->basic_qos(null, 1, null);
      $channel->basic_consume('my_task_queue', '', false, false, false, false, $callback);
      while(count($channel->callbacks)) {
      $channel->wait();
      }
      $channel->close();
      $connection->close();
      }
      三 Publish/Subscribe
      生产者:
      /*
      * php G:\wamp\www\mygedu\yii tools/emit-log msg
      */
      public function actionEmitLog($data='Hello World!'){
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      $channel->exchange_declare('logs', 'fanout', false, false, false);
      if(empty($data)) $data = "info: Hello World!";
      $msg = new AMQPMessage($data);
      $channel->basic_publish($msg, 'logs');
      echo " [x] Sent ", $data, "\n";
      $channel->close();
      $connection->close();
      }
      消费者:
      /*
      * php G:\wamp\www\mygedu\yii tools/receive-logs
      */
      public function actionReceiveLogs(){
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      $channel->exchange_declare('logs', 'fanout', false, false, false);
      list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
      $channel->queue_bind($queue_name, 'logs');
      echo '
  • Waiting for logs. To exit press CTRL+C', "\n";
      $callback = function($msg){
      echo ' [x] ', $msg->body, "\n";
      };
      $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
      while(count($channel->callbacks)) {
      $channel->wait();
      }
      $channel->close();
      $connection->close();
      }
      四 Routing
      生产者:
      /*
      * php G:\wamp\www\mygedu\yii tools/emit-log-direct info msg
      */
      public function actionEmitLogDirect($argv, $data='Hello World!'){
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      $channel->exchange_declare('direct_logs', 'direct', false, false, false);
      $severity = isset($argv) && !empty($argv) ? $argv : 'info';
      $msg = new AMQPMessage($data);
      $channel->basic_publish($msg, 'direct_logs', $severity);
      echo " [x] Sent ",$severity,':',$data," \n";
      $channel->close();
      $connection->close();
      }
      消费者:
      /*
      * php G:\wamp\www\mygedu\yii tools/receive-logs-direct info,warning,error
      */
      public function actionReceiveLogsDirect($argv){
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      $channel->exchange_declare('direct_logs', 'direct', false, false, false);
      list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
      $severities = explode(',', $argv);
      if(empty($severities)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
      exit(1);
      }
      foreach($severities as $severity) {
      $channel->queue_bind($queue_name, 'direct_logs', $severity);
      }
      echo '
  • Waiting for logs. To exit press CTRL+C', "\n";
      $callback = function($msg){
      echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
      };
      $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
      while(count($channel->callbacks)) {
      $channel->wait();
      }
      $channel->close();
      $connection->close();
      }
      五 Topics
      生产者:
      /*
      * php G:\wamp\www\mygedu\yii tools/topics-emit-log-direct info msg
      */
      public function actionTopicsEmitLogDirect($routing_key='kern.critical', $data='Hello World!'){
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      $channel->exchange_declare('topic_logs', 'topic', false, false, false);
      $msg = new AMQPMessage($data);
      $channel->basic_publish($msg, 'topic_logs', $routing_key);
      echo " [x] Sent ",$routing_key,':',$data," \n";
      $channel->close();
      $connection->close();
      }
      消费者:
      /*
      * php G:\wamp\www\mygedu\yii tools/topics-receive-logs-direct info,warning,error
      */
      public function actionTopicsReceiveLogsDirect($binding_keys=''){
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      $channel->exchange_declare('topic_logs', 'topic', false, false, false);
      list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
      $binding_keys = explode(',', $binding_keys);
      if( empty($binding_keys )) {
      file_put_contents('php://stderr', "Usage: $binding_keys\n");
      exit(1);
      }
      foreach($binding_keys as $binding_key) {
      $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
      }
      echo '
  • Waiting for logs. To exit press CTRL+C', "\n";
      $callback = function($msg){
      echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
      };
      $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
      while(count($channel->callbacks)) {
      $channel->wait();
      }
      $channel->close();
      $connection->close();
      }
      六 RPC
      生产者:
      /*
      * php G:\wamp\www\mygedu\yii tools/rpc-client 10
      */
      public function actionRpcClient($fib=10){
      $fibonacci_rpc = new FibonacciRpcClient();
      $response = $fibonacci_rpc->call($fib);
      echo " [.] Got ", $response, "\n";
      }
      消费者:
      /*
      * php G:\wamp\www\mygedu\yii tools/rpc-server
      */
      public function actionRpcServer($routing_key='kern.critical', $data='Hello World!'){
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      $channel->queue_declare('rpc_queue', false, false, false, false);
      function fib($n) {
      if ($n == 0)
      return 0;
      if ($n == 1)
      return 1;
      return fib($n-1) + fib($n-2);
      }
      echo " [x] Awaiting RPC requests\n";
      $callback = function($req) {
      $n = intval($req->body);
      echo " [.] fib(", $n, ")\n";
      $msg = new AMQPMessage(
      (string) fib($n),
      array('correlation_id' => $req->get('correlation_id'))
      );
      $req->delivery_info['channel']->basic_publish(
      $msg, '', $req->get('reply_to'));
      $req->delivery_info['channel']->basic_ack(
      $req->delivery_info['delivery_tag']);
      };
      $channel->basic_qos(null, 1, null);
      $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
      while(count($channel->callbacks)) {
      $channel->wait();
      }
      $channel->close();
      $connection->close();
      }
      相关类:
      class FibonacciRpcClient {
      private $connection;
      private $channel;
      private $callback_queue;
      private $response;
      private $corr_id;
      public function __construct() {
      $this->connection = new AMQPStreamConnection(
      'localhost', 5672, 'guest', 'guest');
      $this->channel = $this->connection->channel();
      list($this->callback_queue, ,) = $this->channel->queue_declare(
      "", false, false, true, false);
      $this->channel->basic_consume(
      $this->callback_queue, '', false, false, false, false,
      array($this, 'on_response'));
      }
      public function on_response($rep) {
      if($rep->get('correlation_id') == $this->corr_id) {
      $this->response = $rep->body;
      }
      }
      public function call($n) {
      $this->response = null;
      $this->corr_id = uniqid();
      $msg = new AMQPMessage(
      (string) $n,
      array('correlation_id' => $this->corr_id,
      'reply_to' => $this->callback_queue)
      );
      $this->channel->basic_publish($msg, '', 'rpc_queue');
      while(!$this->response) {
      $this->channel->wait();
      }
      return intval($this->response);
      }
      }


  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-651692-1-1.html 上篇帖子: PHP中面向对象的数据库操作类 下篇帖子: PHP+Ajax异步带进度条上传文件
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表