设为首页 收藏本站
查看: 1243|回复: 0

[经验分享] Python异步非阻塞IO多路复用Select/Poll/Epoll使用

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-12-15 09:31:43 | 显示全部楼层 |阅读模式
有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:




  • #!/usr/bin/python

  • # -*- coding: utf-8 -*-
  • import select
  • import socket
  • import Queue

  • server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  • server.setblocking(False)
  • server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
  • server_address= ('192.168.1.5',8080)
  • server.bind(server_address)
  • server.listen(10)

  • #select轮询等待读socket集合
  • inputs = [server]
  • #select轮询等待写socket集合
  • outputs = []
  • message_queues = {}
  • #select超时时间
  • timeout = 20

  • while True:
  •     print "等待活动连接......"
  •     readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)

  •     if not (readable or writable or exceptional) :
  •         print "select超时无活动连接,重新select...... "
  •         continue;
  •     #循环可读事件
  •     for s in readable :
  •         #如果是server监听的socket
  •         if s is server:
  •             #同意连接
  •             connection, client_address = s.accept()
  •             print "新连接: ", client_address
  •             connection.setblocking(0)
  •             #将连接加入到select可读事件队列
  •             inputs.append(connection)
  •             #新建连接为key的字典,写回读取到的消息
  •             message_queues[connection] = Queue.Queue()
  •         else:
  •             #不是本机监听就是客户端发来的消息
  •             data = s.recv(1024)
  •             if data :
  •                 print "收到数据:" , data , "客户端:",s.getpeername()
  •                 message_queues[s].put(data)
  •                 if s not in outputs:
  •                     #将读取到的socket加入到可写事件队列
  •                     outputs.append(s)
  •             else:
  •                 #空白消息,关闭连接
  •                 print "关闭连接:", client_address
  •                 if s in outputs :
  •                     outputs.remove(s)
  •                 inputs.remove(s)
  •                 s.close()
  •                 del message_queues[s]
  •     for s in writable:
  •         try:
  •             msg = message_queues[s].get_nowait()
  •         except Queue.Empty:
  •             print "连接:" , s.getpeername() , '消息队列为空'
  •             outputs.remove(s)
  •         else:
  •             print "发送数据:" , msg , "到", s.getpeername()
  •             s.send(msg)

  •     for s in exceptional:
  •         print "异常连接:", s.getpeername()
  •         inputs.remove(s)
  •         if s in outputs:
  •             outputs.remove(s)
  •         s.close()
  •         del message_queues[s]

Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:




  • #!/usr/bin/python

  • # -*- coding: utf-8 -*-
  • import socket
  • import select
  • import Queue

  • server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  • server.setblocking(False)
  • server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  • server_address = ("192.168.1.5", 8080)
  • server.bind(server_address)
  • server.listen(5)
  • print "服务器启动成功,监听IP:" , server_address
  • message_queues = {}
  • #超时,毫秒
  • timeout = 5000
  • #监听哪些事件
  • READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
  • READ_WRITE = (READ_ONLY|select.POLLOUT)
  • #新建轮询事件对象
  • poller = select.poll()
  • #注册本机监听socket到等待可读事件事件集合
  • poller.register(server,READ_ONLY)
  • #文件描述符到socket映射
  • fd_to_socket = {server.fileno():server,}
  • while True:
  •     print "等待活动连接......"
  •     #轮询注册的事件集合
  •     events = poller.poll(timeout)
  •     if not events:
  •       print "poll超时,无活动连接,重新poll......"
  •       continue
  •     print "有" , len(events), "个新事件,开始处理......"
  •     for fd ,flag in events:
  •         s = fd_to_socket[fd]
  •         #可读事件
  •         if flag & (select.POLLIN | select.POLLPRI) :
  •             if s is server :
  •                 #如果socket是监听的server代表有新连接
  •                 connection , client_address = s.accept()
  •                 print "新连接:" , client_address
  •                 connection.setblocking(False)

  •                 fd_to_socket[connection.fileno()] = connection
  •                 #加入到等待读事件集合
  •                 poller.register(connection,READ_ONLY)
  •                 message_queues[connection] = Queue.Queue()
  •             else :
  •                 #接收客户端发送的数据
  •                 data = s.recv(1024)
  •                 if data:
  •                     print "收到数据:" , data , "客户端:" , s.getpeername()
  •                     message_queues[s].put(data)
  •                     #修改读取到消息的连接到等待写事件集合
  •                     poller.modify(s,READ_WRITE)
  •                 else :
  •                     # Close the connection
  •                     print "  closing" , s.getpeername()
  •                     # Stop listening for input on the connection
  •                     poller.unregister(s)
  •                     s.close()
  •                     del message_queues[s]
  •         #连接关闭事件
  •         elif flag & select.POLLHUP :
  •             print " Closing ", s.getpeername() ,"(HUP)"
  •             poller.unregister(s)
  •             s.close()
  •         #可写事件
  •         elif flag & select.POLLOUT :
  •             try:
  •                 msg = message_queues[s].get_nowait()
  •             except Queue.Empty:
  •                 print s.getpeername() , " queue empty"
  •                 poller.modify(s,READ_ONLY)
  •             else :
  •                 print "发送数据:" , data , "客户端:" , s.getpeername()
  •                 s.send(msg)
  •         #异常事件
  •         elif flag & select.POLLERR:
  •             print "  exception on" , s.getpeername()
  •             poller.unregister(s)
  •             s.close()
  •             del message_queues[s]

Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:




  • #!/usr/bin/python

  • # -*- coding: utf-8 -*-
  • import socket, select
  • import Queue

  • serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  • serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  • server_address = ("192.168.1.5", 8080)
  • serversocket.bind(server_address)
  • serversocket.listen(1)
  • print "服务器启动成功,监听IP:" , server_address
  • serversocket.setblocking(0)
  • timeout = 10
  • #新建epoll事件对象,后续要监控的事件添加到其中
  • epoll = select.epoll()
  • #添加服务器监听fd到等待读事件集合
  • epoll.register(serversocket.fileno(), select.EPOLLIN)
  • message_queues = {}

  • fd_to_socket = {serversocket.fileno():serversocket,}
  • while True:
  •   print "等待活动连接......"
  •   #轮询注册的事件集合
  •   events = epoll.poll(timeout)
  •   if not events:
  •      print "epoll超时无活动连接,重新轮询......"
  •      continue
  •   print "有" , len(events), "个新事件,开始处理......"
  •   for fd, event in events:
  •      socket = fd_to_socket[fd]
  •      #可读事件
  •      if event & select.EPOLLIN:
  •          #如果活动socket为服务器所监听,有新连接
  •          if socket == serversocket:
  •             connection, address = serversocket.accept()
  •             print "新连接:" , address
  •             connection.setblocking(0)
  •             #注册新连接fd到待读事件集合
  •             epoll.register(connection.fileno(), select.EPOLLIN)
  •             fd_to_socket[connection.fileno()] = connection
  •             message_queues[connection] = Queue.Queue()
  •          #否则为客户端发送的数据
  •          else:
  •             data = socket.recv(1024)
  •             if data:
  •                print "收到数据:" , data , "客户端:" , socket.getpeername()
  •                message_queues[socket].put(data)
  •                #修改读取到消息的连接到等待写事件集合
  •                epoll.modify(fd, select.EPOLLOUT)
  •      #可写事件
  •      elif event & select.EPOLLOUT:
  •         try:
  •            msg = message_queues[socket].get_nowait()
  •         except Queue.Empty:
  •            print socket.getpeername() , " queue empty"
  •            epoll.modify(fd, select.EPOLLIN)
  •         else :
  •            print "发送数据:" , data , "客户端:" , socket.getpeername()
  •            socket.send(msg)
  •      #关闭事件
  •      elif event & select.EPOLLHUP:
  •         epoll.unregister(fd)
  •         fd_to_socket[fd].close()
  •         del fd_to_socket[fd]
  • epoll.unregister(serversocket.fileno())
  • epoll.close()
  • serversocket.close()
原文链接



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-151378-1-1.html 上篇帖子: python enumerate用法 下篇帖子: Python 的 with 语句详解
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表