设为首页 收藏本站
查看: 1246|回复: 0

[经验分享] 那些年被我坑过的Python——第十章Broker(rabbitMQ/redis)

[复制链接]

尚未签到

发表于 2017-7-4 18:58:28 | 显示全部楼层 |阅读模式
  基于RabbitMQ的direct任务驱动异步RPC程序实现:
  RPC_dispatcher指令分发器:


DSC0000.gif DSC0001.gif


#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = "Zhang Xuyao"
import pika
import uuid
import time
import threading

class RpcDispatcher(object):
     def __init__(self, rMQ_addr):
         self.cmd_list = [
             "ls", "df", "free",
             "ip", "ifconfig", "tail",
             "head", "grep", "uptime",
             "date"
         ]
         self.task_dict = {}
         self.task_fin_dict = {}
         self.routing_key_list = []

         self.connection = pika.BlockingConnection(
             pika.ConnectionParameters(host=rMQ_addr)
         )
         self.channel = self.connection.channel()

         # 收数据用
         recv_queue = self.channel.queue_declare(exclusive=True)
         self.recv_queue_name = recv_queue.method.queue
         self.channel.basic_consume(self._on_response, queue=self.recv_queue_name)

         # 发数据用
         self.channel.exchange_declare(exchange='send', type='direct')
         self.send_queue = self.channel.queue_declare()
         self.send_queue_name = self.send_queue.method.queue

     # 获取指定通道的响应(4)###
     def _on_response(self, ch, method, parameters, msg):
         if parameters.correlation_id in self.task_dict:
             self.task_dict[parameters.correlation_id][parameters.app_id]["response"] = msg.decode("utf-8")
             ch.basic_ack(delivery_tag=method.delivery_tag)
             self.task_dict[parameters.correlation_id][parameters.app_id]["recv_time"] = time.time()
             fin_flag = False
             for host in self.task_dict[parameters.correlation_id]:
                 if self.task_dict[parameters.correlation_id][host]["response"] != None:
                     continue
                 else:

                     break
             # 执行结果全部收到就把记录转移到已经完成容器中,根据检查已经完成容器中的内容用于确定任务的状态
             else:
                 self.task_fin_dict[parameters.correlation_id] = self.task_dict[parameters.correlation_id]
                 del self.task_dict[parameters.correlation_id]

     # 发送请求(2)######
     def _on_request(self, input_cmd, host_list):
         print(RpcDispatcher.colorStr("[x]Requesting>>: '%s' on %s"
                                      % (cmd, tuple(host_list)), 33))
         self.response = None
         # 生成全局校验码
         corr_id = str(uuid.uuid4())
         print(RpcDispatcher.colorStr("[x]Task_id>>: %s"
                                      % corr_id, 34))
         self.task_dict[corr_id] = {}
         if host_list:
             for host in host_list:
                 self.task_dict[corr_id][host] = {
                     "cmd": input_cmd,
                     "response": None,
                     "req_time": time.time(),
                     "recv_time": None,
                 }
                 # 绑定routing_key准备并发布消息
                 self.channel.queue_bind(exchange='send',
                                         queue=self.send_queue_name,
                                         routing_key=host)
                 # 向执行器并发布消息指令
                 self.channel.basic_publish(exchange='send',
                                            routing_key=host,
                                            properties=pika.BasicProperties(
                                                reply_to=self.recv_queue_name,
                                                correlation_id=corr_id,
                                            ),
                                            body=str(input_cmd))
                 # 消息发布后解除routing_key绑定
                 self.channel.queue_unbind(exchange='send',
                                           queue=self.send_queue_name,
                                           routing_key=host)

             # 守护线程负责不断检测响应结果是否全部收到
             on_recv_thread = threading.Thread(target=self._on_recv, args=[corr_id, ])
             on_recv_thread.setDaemon(True)
             on_recv_thread.start()

     # 等待数据消息(3)
     def _on_recv(self, task_id):
         # 根据检查已经完成容器中的指定task_id是否存在来确定任务的状态,为空则继续收取消息
         while task_id not in self.task_fin_dict:
             self.connection.process_data_events()

     # 显示已经完成的任务编号(5)
     def show_task_fin(self):
         print("尚未查看的任务:")
         for task_id in self.task_fin_dict:
             value_list = tuple(self.task_fin_dict[task_id].values())
             host_list = tuple(self.task_fin_dict[task_id].keys())
             cmd = str(value_list[0]["cmd"])
             print(RpcDispatcher.colorStr("[task_id]: %s | [cmd_info]: '%s' | [host_list]: %s"
                                          % (task_id, cmd, host_list), 32))

     # 获取指定任务的执行结果(6)
     def get_response(self, task_id):
         if task_id in self.task_fin_dict:
             for host in self.task_fin_dict[task_id]:
                 response = self.task_fin_dict[task_id][host]["response"]
                 cmd_req = self.task_fin_dict[task_id][host]["cmd"]
                 time_cost = self.task_fin_dict[task_id][host]["recv_time"] - \
                             self.task_fin_dict[task_id][host]["req_time"]
                 time_cost = round(time_cost, 3)
                 print(RpcDispatcher.colorStr("Host: %s           | Cmd: '%s' \nTime Cost: %ss | Response: "
                                              % (host, cmd_req, time_cost), 33))
                 print(RpcDispatcher.colorStr(response, 36))
             del self.task_fin_dict[task_id]
         else:
             print("任务结果尚未全部返回")

     # 接收外部输入,调用请求(1)
     def call(self, cmd, host_list):
         return self._on_request(cmd, host_list)

     @staticmethod
     def colorStr(aStr, color_code):
         return "\033[0;" + str(color_code) + ";0m" + aStr + "\033[0m"
     def __del__(self):
         self.connection.close()

if __name__ == '__main__':
     cmd_rpc = RpcDispatcher(rMQ_addr="localhost")
     while True:
         cmd = input("[$]Cmd>>:")
         if cmd.lower() != 'eof' and cmd.lower() != 'exit':
             if cmd.split()[0].lower() in ["$s"]:
                 cmd_rpc.show_task_fin()
             elif cmd.split()[0].lower() in ["$c"] and len(cmd.split()) == 2:
                 cmd_rpc.get_response(cmd.split()[1])
             else:
                 cmd_split = cmd.split()
                 has_host = cmd_split.count("--hosts")
                 if has_host != 1:
                     print(cmd_rpc.colorStr("Usage <cmd> --hosts <ip1>[,<ip2>[,<ip3>...]]", 35))
                     continue
                 else:
                     if len(cmd_split) <= cmd_split.index("--hosts") + 1:
                         print("请至少指定一个主机IP")
                         continue
                     host_list = cmd_split[cmd_split.index("--hosts") + 1].split(',')
                     cmd = " ".join(cmd_split[0:cmd_split.index("--hosts")]).strip()

                     if cmd.split()[0] in cmd_rpc.cmd_list:
                         cmd_rpc.call(cmd, host_list)
                     else:
                         print("您输入的命令暂不支持...")
                         continue
         else:
             break
RPC 指令分发器代码  RPC_executor指令执行器:





#!/usr/bin/env python
# -*- coding:utf-8 -*-
__Author__ = "Zhang Xuyao"
import pika
import os, time

class RpcExecutor(object):
     def __init__(self, rMQ_addr, ip):
         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
             host=rMQ_addr))
         self.channel = self.connection.channel()

         # 接收命令用
         self.ip = ip
         self.channel.exchange_declare(exchange='send', type='direct')
         self.send_queue = self.channel.queue_declare()
         self.send_queue_name = self.send_queue.method.queue
         self.channel.queue_bind(exchange='send', routing_key=self.ip, queue=self.send_queue_name)
         self.channel.basic_consume(self._on_request, queue=self.send_queue_name)

     # 开始订阅消息
     def run(self):
         print(" [x] Awaiting RPC requests")
         self.channel.start_consuming()

     # 执行消息命令
     def exec_cmd(self, cmd_str):
         result = os.popen(cmd_str).read()
         if not result:
             return "命令没有输出结果"
         else:
             return result

     # 请求事件的回调函数
     def _on_request(self, ch, method, props, body):
         cmd_str = body.decode('utf-8')
         print(" [.] exec_cmd(%s)" % cmd_str)
         response = self.exec_cmd(cmd_str)

         # 发送命令结果,通过传来的queue进行发布,这里的app_id是额外增加的IP信息,区分于其他执行器的响应结果
         ch.basic_publish(exchange='',
                          routing_key=props.reply_to,
                          properties=pika.BasicProperties(
                              correlation_id=props.correlation_id,
                              app_id=self.ip
                          ),
                          body=str(response)
                          )
         # 消息反馈确认,确保对方确实收到了响应结果
         ch.basic_ack(delivery_tag=method.delivery_tag)

     def __del__(self):
         self.connection.close()

if __name__ == '__main__':
     ip = input('请输入IP地址>>:')
     cmd_executor = RpcExecutor('localhost', ip)
     cmd_executor.run()
RPC 执行器代码

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390753-1-1.html 上篇帖子: 校招开篇第一次面试——打响校招的第一枪(二) 下篇帖子: 消息队列 RabbitMQ 与 Spring 整合使用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表