设为首页 收藏本站
查看: 1205|回复: 0

[经验分享] saltstack系列(三)——zmq订阅/发布模式

[复制链接]

尚未签到

发表于 2018-1-4 07:36:12 | 显示全部楼层 |阅读模式
zmq订阅发布模式
  server端代码:
  

#coding=utf-8  
'''''
  
服务端,发布模式
  
'''
  
import zmq
  
from random import randrange
  
context = zmq.Context()
  
socket = context.socket(zmq.PUB)
  
socket.bind("tcp://127.0.0.1:8000")
  
while True:
  zipcode = randrange(1, 100000)
  temperature = randrange(-80, 135)
  relhumidity = randrange(10, 60)

  socket.send("%i %i %i" % (zipcode,temperature ,>  

  

  客户端代码:
  

#coding=utf-8  
'''''
  
订阅模式,如果设置了过滤条件,那么只会接收到以过滤条件开头的消息
  
'''
  
import sys
  
import zmq
  
#  Socket to talk to server
  
context = zmq.Context()
  
socket = context.socket(zmq.SUB)
  
print("Collecting updates from weather server...")
  
socket.connect("tcp://127.0.0.1:8000")
  
# Subscribe to zipcode, default is NYC, 10001
  
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10002"
  
#此处设置过滤条件,只有以 zip_filter 开头的消息才会被接收
  
socket.setsockopt(zmq.SUBSCRIBE, zip_filter)
  
# Process 5 updates
  
total_temp = 0
  
for update_nbr in range(5):
  string = socket.recv()
  print string

  zipcode, temperature,>  total_temp += int(temperature)
  
print("Average temperature for zipcode '%s' was %dF" % (
  zip_filter, total_temp / update_nbr)
  
)
  

  


总结  
  1、 zmq的程序,也是要分清服务端和客户端的,服务端也是要绑定ip和端口的
  2、 如果我们先启动客户端,后启动服务端,那么程序是可以正常运行的,换成socket,就不行,socket只能先启动服务端,后启动客户端
  3、 学习zmq的过程,千万别总想着socket,你能用socket传输文件,但是如果用zmq做同样的事情,那你就错误的使用了zmq,记住,这是一个消息通信库,它自己实现了一些协议,使得我们可以非常轻松的在节点间,进程间,线程间传递消息,如果你对我刚才说的节点间,进程间,线程间传递消息没什么兴趣,说明,你平日里写的程序都是单进程,单线程的,只管顺序执行就好了,其他的不用考虑。
  4、广播,这种模式没有队列缓存,断开之后数据将丢失
  下面,分析这两段程序。
  1、 不论是服务端还是客户端,都需要获得zmq上下文
  

context = zmq.Context()  

  

  2、获得socket,这个socket不是我们平日里以为的那个socket。zmq里叫socket,我猜可能是为了方便大家学习才这么命名。它的表现,已经远远的超出了我们对以前的那个socket的了解。每一个socket都是有自己的类型的,示例中,服务端的socket的类型是zmq.PUB,客户端的socket的类型是zmq.SUB,pub是发布,sub是订阅。说的通俗点,就是有一个pub节点,可以有多个sub节点,pub节点发出去的消息,如果sub节点没有设置过滤条件,那么就会接收所有的消息,如果有过滤条件,就只接收满足过滤条件的消息。想想看,有没有那么一个时刻,你希望你的程序等待一个命令,收到命令后,你让程序去做一些事情?那么pub与sub模式非常适合这种应用场景。
  3、设置过滤条件很简单 
  

socket.setsockopt(zmq.SUBSCRIBE, zip_filter)  

  

  第二个参数就是你期望的过滤条件,只有那些以这个过滤条件开头的消息才会被接收  

问答环节
  问题1: 如果想创建多个socket怎么写?
  答: 一个上下文可以创建任意多个socket,完全不受限制
  问题2: 明明先启动了客户端,后启动的服务端,为啥有些消息却没有收到呢?
  答: 就算你先启动了客户端,服务端pub出去的一些消息也还是可能没有被收到,因为你启动服务端时,服务端与客户端要建立连接,而这个时候,消息其实已经发出去了,所以你没收到。
  问题3: 在订阅发布模型中,如果客户端断开连接,或是服务端断开连接会产生什么样的影响
  答: 如果是客户端断开连接,没什么的,就好比一堆人在听收音机,现在离开一个人,收音机继续播放喽。如果是服务端断开了呢,比如程序死掉了,那么请放心,客户端不会发生崩溃,只是阻塞在socket.recv() 这条语句上,更神奇的是,如果你恢复了服务端
  现在,我们修改一下客户端程序
  

#coding=utf-8  
'''''
  
订阅模式,如果设置了过滤条件,那么只会接收到以过滤条件开头的消息
  
'''
  
import sys
  
import zmq
  
import time
  
#  Socket to talk to server
  
context = zmq.Context()
  
socket = context.socket(zmq.SUB)
  
print("Collecting updates from weather server...")
  
socket.connect("tcp://localhost:8000")
  
# Subscribe to zipcode, default is NYC, 10001
  
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10002"
  
#此处设置过滤条件,只有以 zip_filter 开头的消息才会被接收
  
socket.setsockopt(zmq.SUBSCRIBE, zip_filter)
  
# Process 5 updates
  
total_temp = 0
  
for update_nbr in range(50):
  print 'wait recv'
  string = socket.recv()
  print 'has recv'
  time.sleep(1)
  print string

  zipcode, temperature,>  total_temp += int(temperature)
  
print("Average temperature for zipcode '%s' was %dF" % (
  zip_filter, total_temp / update_nbr)
  
)
  

  

  服务端
  

#coding=utf-8  
'''''
  
服务端,发布模式
  
'''
  
import zmq
  
import time
  
from random import randrange
  
context = zmq.Context()
  
socket = context.socket(zmq.PUB)
  
socket.bind("tcp://*:8000")
  
while True:
  zipcode = randrange(1, 100000)
  temperature = randrange(-80, 135)
  relhumidity = randrange(10, 60)

  socket.send("%i %i %i" % (10002,temperature ,>  

  

  服务端和客户端都启动,这时候,客户端收到一条消息后会睡一秒钟,但是服务端却是一刻不停的在发送消息,那么问题来了,一个发的快,一个收的慢,那么这时候把服务端停掉会怎样呢?
  实际的效果是,服务端停下来了,客户端依然在接收消息,因为有一些消息被缓存起来了,虽然服务端不再发送了,客户端却依然可以接收得到,但这种接收,只是从之前接收的缓冲区里取数据。
  现在,我们在服务端最后加上一条语句,time.sleep(2),这样,服务端发送一条消息后,睡两秒钟,发的慢,收的快了,我们再次启动服务端和客户端,当客户端收到一些消息后,关掉服务端,这次,客户端很快就停止接收了,因为发的慢,所以缓冲区里没有数据,现在,我们再次启动服务端,你会发现,客户端又开始接收数据了,哈哈,神奇吧!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-431395-1-1.html 上篇帖子: 3、SaltStack之远程执行 下篇帖子: Saltstack之SaltSyndic
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表