q986 发表于 2018-1-4 06:45:10

了解saltstack的通信协议zeromq(二) 转

  上文讨论了PAIR/PAIR,REQ/REP两种模式,现在看看PUB/SUB和PUSH/PULL模式。
  PUB/SUB:发布订阅模式,跟我们订阅新闻类似的,采用异步IO,多对多模式,如果没有订阅,服务端发送的消息直接丢弃掉。

  pub_server.py
import zmqimport randomimport sysimport time port = "5556"if len(sys.argv) > 1:      port =sys.argv      int(port) context = zmq.Context()socket = context.socket(zmq.PUB)socket.bind("tcp://*:%s" % port) while True:      topic = random.randrange(9999,10005)      messagedata = random.randrange(1,215) - 80      print "%d %d" % (topic, messagedata)      socket.send("%d %d" % (topic, messagedata))      time.sleep(1)  sub_client.py
import sysimport timeimport zmq port = "5556"# Socket to talk to servercontext = zmq.Context()socket = context.socket(zmq.SUB) print "Collecting updates from weather server..."socket.connect("tcp://localhost:%s" % port)   #socket.set(zmq.UNSUBSCRIBE, messagedata)topicfilter = "10001"socket.set(zmq.SUBSCRIBE, topicfilter) #Process 5 updatestotal_value = 0#for update_nbr in range (5):while True:      string = socket.recv()      topic, messagedata = string.split()#       total_value += int(messagedata)      print topic, messagedata      time.sleep(1)  zmq.SUBCRIBE是用来指明订阅某种消息,这里订阅的是出现10001的信息
  PUSH/PULL:任务分发模式,主要用于分布式计算的,将很多个任务分发到worker,然后worker将计算结果发送到结果收集器。

  producer.py
import timeimport zmqimport random context = zmq.Context()sender = context.socket(zmq.PUSH)sender.bind('tcp://*:5557') # sync start of batch# be sure all worker connect successsink = context.socket(zmq.PUSH)sink.connect('tcp://0.0.0.0:5558')print 'Press Enter when the workers are ready:'_ = raw_input()print 'Sending tasks to workers...'sink.send(b'0')for task_nbr in xrange(1000000):      workload = random.randint(1,10)      sender.send_string(u'%i' % workload) for i in range(10):      sender.send_string(u'0')time.sleep(1)  consumer.py
import sysimport timeimport zmq context = zmq.Context() # Socket to recevie messages onreceiver = context.socket(zmq.PULL)receiver.connect('tcp://localhost:5557') # socket to send messagessender = context.socket(zmq.PUSH)sender.connect('tcp://localhost:5558') while True:      a_str = receiver.recv_string()      num = int(a_str)      if num % 2 == 0 or a_str == u'0':                sender.send_string(a_str)  result.py
import sysimport timeimport zmq context = zmq.Context()# Socket to receive messages onreceiver = context.socket(zmq.PULL)receiver.bind("tcp://*:5558") # Wait for start of batchs = receiver.recv()sum = 0flag = 0# Start our clock nowtstart = time.time()while True:      a_str = receiver.recv_string()      num = int(a_str)      sum += num      if a_str == '0':                flag += 1      if flag == 10:                break tend = time.time()tdiff = tend - tstarttotal_msec = tdiff * 1000print "Total elapsed time: %d msec" % total_msec
  这个结果并不精确,分别是启动1个、2个、3个、4个consumer.py进程的测试结果,说明计算缩短了时间。
  Queue,Forwarder,Streamer分别是REQ/REP、PUB/SUB、PUSH/PULL的代理,用于代理不同网段的机器。
  关于代理的用法,这里不讲述。请参考下面地址。
页: [1]
查看完整版本: 了解saltstack的通信协议zeromq(二) 转