RabbitMQ 实现RPC
Serverimport pika
class RPC_Server(object):
def __init__(self):
self.conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.20.180"))
self.channel = self.conn.channel()
self.channel.exchange_declare(exchange="rpc_exchange2", exchange_type="direct")
result = self.channel.queue_declare(exclusive=True)
self.queue_name = result.method.queue
self.channel.queue_bind(exchange="rpc_exchange2", routing_key=self.queue_name, queue=self.queue_name)
self.channel.basic_consume(self.callback, no_ack=True, queue=self.queue_name)
print(self.queue_name)
def callback(self, ch, method, properties, body):
print(body)
self.channel.stop_consuming()
def call(self):
self.channel.basic_publish(exchange="rpc_exchange2",
routing_key="rpc",
properties=pika.BasicProperties(reply_to=self.queue_name,),
body="Hello RPC Client...")
self.channel.start_consuming()
while True:
rpc = RPC_Server()
input("...")
rpc.call()
Client
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.20.180"))
channel = conn.channel()
channel.exchange_declare(exchange="rpc_exchange2", exchange_type="direct")
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="rpc_exchange2", routing_key="rpc", queue=queue_name)
def on_request(ch, method, properties, body):
print("body:", body)
ch.basic_publish(exchange="rpc_exchange2",
routing_key=properties.reply_to,
body="Hello RPC Server...")
print(queue_name)
# ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue=queue_name, no_ack=True)
channel.start_consuming()
页:
[1]