设为首页 收藏本站
查看: 1199|回复: 0

[经验分享] 41. Python Queue 多进程的消息队列 PIPE

[复制链接]

尚未签到

发表于 2018-8-9 08:31:20 | 显示全部楼层 |阅读模式
  消息队列:
  消息队列是在消息传输过程中保存消息的容器。
  消息队列最经典的用法就是消费者和生产者之间通过消息管道来传递消息,消费者和生产生是不通的进程。生产者往管道中写消息,消费者从管道中读消息。
DSC0000.jpg

  相当于水管,有一个入口和出口,水从入口流入出口流出,这就是一个消息队列
  线程或进程往队列里面添加数据,出口从队列里面读数据
  左侧多线程往入口处添加完数据,任务就结束了;右侧只要依次从水管里取数据就行了。
  异步完成的任务
  比如京东下单,下单后付完钱,相当于把消息堆在了水管里,后台会有线程去接收这个单的消息,然后去库房,发货,走物流,直到接收货物并签收完,点击完成,整个流程才走完。
  客户交完钱后,丢了个消息在这个队列中,会给客户返回一个结果,告知你已经买了这个商品;而后面接收订单消息,发货,物流都是后面的"进程"或"线程"干的事情。
  所以,一般在异步处理问题时候,都会用到消息队列处理的这种思想。
  使用multiprocessing里面的Queue来实现消息队列。
  语法:
from mutliprocessing import Queue  
q = Queue
  
q.put(data)
  
data = q.get(data)
  举例:
from multiprocessing import Queue, Process  

  
def write(q):
  
    for i in ['a','b','c','d']:
  
        q.put(i)
  
        print ('put {0} to queue'.format(i))
  

  
def read(q):
  
    while 1:
  
        result = q.get()
  
        print ("get {0} from queue".format(result))
  

  

  
def main():
  
    q = Queue()
  
    pw = Process(target=write,args=(q,))
  
    pr = Process(target=read,args=(q,))
  
    pw.start()
  
    pr.start()
  
    pw.join()
  
    pr.terminate()  #停止
  
    # 相当于join,等pr完成以后,当whlie没有任何执行后,结束。
  

  
if __name__ == '__main__':
  
    main()
  返回结果:
put a to queue  
get a from queue
  
put b to queue
  
get b from queue
  
put c to queue
  
get c from queue
  
put d to queue
  
get d from queue
  PIPE:
  多进程里面有个pipe的方法来实现消息队列:
  1. Pipe 方法返回(conn1, conn2)代表一个管道的两端。PIPE方法有个deplex参数,如果deplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接收消息,conn2负责发送消息。
  2.send 和recv方法分别是发送和接受消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。
import time  
from multiprocessing import Pipe, Process
  

  

  
def proc1(pipe):
  
    for i in xrange(1, 10):
  
        pipe.send(i)
  
        print ("send {0} to pipe".format(i))
  
        time.sleep(1)
  

  

  
def proc2(pipe):
  
    n = 9
  
    while n > 0:
  
        result = pipe.recv()
  
        print ("recv {0} from pipe".format(result))
  
        n -= 1
  

  

  

  
def main():
  
    pipe = Pipe(duplex=False)
  
    print (type(pipe))
  
    p1 = Process(target=proc1, args=(pipe[1],))
  
    p2 = Process(target=proc2, args=(pipe[0],)) #接收写0
  
    p1.start()
  
    p2.start()
  
    p1.join()
  
    p2.join()
  
    pipe[0].close()
  
    pipe[1].close()
  

  

  
if __name__ == '__main__':
  
    main()
  返回结果(逐行打印):
<type 'tuple'>  
send 1 to pipe
  
recv 1 from pipe
  
recv 2 from pipe
  
send 2 to pipe
  
send 3 to pipe
  
recv 3 from pipe
  
recv 4 from pipe
  
send 4 to pipe
  
send 5 to pipe
  
recv 5 from pipe
  
recv 6 from pipe
  
send 6 to pipe
  
recv 7 from pipe
  
send 7 to pipe
  
recv 8 from pipe
  
send 8 to pipe
  
send 9 to pipe
  
recv 9 from pipe

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-548903-1-1.html 上篇帖子: python命令行参数模块argparse 下篇帖子: python 3 ---字符串方法使用整理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表