RabbitMQ入门(PHP语言描述)
二 Work queues生产者:
/*
* php G:\wamp\www\mygedu\yii tools/new-task msg
*/
public function actionNewTask($data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('my_task_queue', false, true, false, false);
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
);
$channel->basic_publish($msg, '', 'my_task_queue');
echo " Sent ", $data, "\n";
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/worker
*/
public function actionWorker(){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('my_task_queue', false, true, false, false);
echo '
[*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('my_task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
三 Publish/Subscribe
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/emit-log msg
*/
public function actionEmitLog($data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
if(empty($data)) $data = "info: Hello World!";
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo " Sent ", $data, "\n";
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/receive-logs
*/
public function actionReceiveLogs(){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('logs', 'fanout', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queue_name, 'logs');
echo '
[*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
四 Routing
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/emit-log-direct info msg
*/
public function actionEmitLogDirect($argv, $data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$severity = isset($argv) && !empty($argv) ? $argv : 'info';
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'direct_logs', $severity);
echo " Sent ",$severity,':',$data," \n";
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/receive-logs-direct info,warning,error
*/
public function actionReceiveLogsDirect($argv){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$severities = explode(',', $argv);
if(empty($severities)) {
file_put_contents('php://stderr', "Usage: $argv \n");
exit(1);
}
foreach($severities as $severity) {
$channel->queue_bind($queue_name, 'direct_logs', $severity);
}
echo '
[*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
五 Topics
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/topics-emit-log-direct info msg
*/
public function actionTopicsEmitLogDirect($routing_key='kern.critical', $data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo " Sent ",$routing_key,':',$data," \n";
$channel->close();
$connection->close();
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/topics-receive-logs-direct info,warning,error
*/
public function actionTopicsReceiveLogsDirect($binding_keys=''){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('topic_logs', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = explode(',', $binding_keys);
if( empty($binding_keys )) {
file_put_contents('php://stderr', "Usage: $binding_keys\n");
exit(1);
}
foreach($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}
echo '
[*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function($msg){
echo ' ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
六 RPC
生产者:
/*
* php G:\wamp\www\mygedu\yii tools/rpc-client 10
*/
public function actionRpcClient($fib=10){
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call($fib);
echo " [.] Got ", $response, "\n";
}
消费者:
/*
* php G:\wamp\www\mygedu\yii tools/rpc-server
*/
public function actionRpcServer($routing_key='kern.critical', $data='Hello World!'){
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n) {
if ($n == 0)
return 0;
if ($n == 1)
return 1;
return fib($n-1) + fib($n-2);
}
echo " Awaiting RPC requests\n";
$callback = function($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);
$req->delivery_info['channel']->basic_publish(
$msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack(
$req->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
相关类:
class FibonacciRpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
public function __construct() {
$this->connection = new AMQPStreamConnection(
'localhost', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
list($this->callback_queue, ,) = $this->channel->queue_declare(
"", false, false, true, false);
$this->channel->basic_consume(
$this->callback_queue, '', false, false, false, false,
array($this, 'on_response'));
}
public function on_response($rep) {
if($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
public function call($n) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
(string) $n,
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}
页:
[1]