rrt44 发表于 2015-12-15 09:31:43

Python异步非阻塞IO多路复用Select/Poll/Epoll使用

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:




[*]#!/usr/bin/python

[*]# -*- coding: utf-8 -*-
[*]import select
[*]import socket
[*]import Queue
[*]
[*]server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
[*]server.setblocking(False)
[*]server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
[*]server_address= ('192.168.1.5',8080)
[*]server.bind(server_address)
[*]server.listen(10)
[*]
[*]#select轮询等待读socket集合
[*]inputs = [server]
[*]#select轮询等待写socket集合
[*]outputs = []
[*]message_queues = {}
[*]#select超时时间
[*]timeout = 20
[*]
[*]while True:
[*]    print "等待活动连接......"
[*]    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
[*]
[*]    if not (readable or writable or exceptional) :
[*]      print "select超时无活动连接,重新select...... "
[*]      continue;
[*]    #循环可读事件
[*]    for s in readable :
[*]      #如果是server监听的socket
[*]      if s is server:
[*]            #同意连接
[*]            connection, client_address = s.accept()
[*]            print "新连接: ", client_address
[*]            connection.setblocking(0)
[*]            #将连接加入到select可读事件队列
[*]            inputs.append(connection)
[*]            #新建连接为key的字典,写回读取到的消息
[*]            message_queues[connection] = Queue.Queue()
[*]      else:
[*]            #不是本机监听就是客户端发来的消息
[*]            data = s.recv(1024)
[*]            if data :
[*]                print "收到数据:" , data , "客户端:",s.getpeername()
[*]                message_queues[s].put(data)
[*]                if s not in outputs:
[*]                  #将读取到的socket加入到可写事件队列
[*]                  outputs.append(s)
[*]            else:
[*]                #空白消息,关闭连接
[*]                print "关闭连接:", client_address
[*]                if s in outputs :
[*]                  outputs.remove(s)
[*]                inputs.remove(s)
[*]                s.close()
[*]                del message_queues[s]
[*]    for s in writable:
[*]      try:
[*]            msg = message_queues[s].get_nowait()
[*]      except Queue.Empty:
[*]            print "连接:" , s.getpeername() , '消息队列为空'
[*]            outputs.remove(s)
[*]      else:
[*]            print "发送数据:" , msg , "到", s.getpeername()
[*]            s.send(msg)
[*]
[*]    for s in exceptional:
[*]      print "异常连接:", s.getpeername()
[*]      inputs.remove(s)
[*]      if s in outputs:
[*]            outputs.remove(s)
[*]      s.close()
[*]      del message_queues[s]

Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:




[*]#!/usr/bin/python

[*]# -*- coding: utf-8 -*-
[*]import socket
[*]import select
[*]import Queue
[*]
[*]server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
[*]server.setblocking(False)
[*]server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
[*]server_address = ("192.168.1.5", 8080)
[*]server.bind(server_address)
[*]server.listen(5)
[*]print "服务器启动成功,监听IP:" , server_address
[*]message_queues = {}
[*]#超时,毫秒
[*]timeout = 5000
[*]#监听哪些事件
[*]READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
[*]READ_WRITE = (READ_ONLY|select.POLLOUT)
[*]#新建轮询事件对象
[*]poller = select.poll()
[*]#注册本机监听socket到等待可读事件事件集合
[*]poller.register(server,READ_ONLY)
[*]#文件描述符到socket映射
[*]fd_to_socket = {server.fileno():server,}
[*]while True:
[*]    print "等待活动连接......"
[*]    #轮询注册的事件集合
[*]    events = poller.poll(timeout)
[*]    if not events:
[*]      print "poll超时,无活动连接,重新poll......"
[*]      continue
[*]    print "有" , len(events), "个新事件,开始处理......"
[*]    for fd ,flag in events:
[*]      s = fd_to_socket[fd]
[*]      #可读事件
[*]      if flag & (select.POLLIN | select.POLLPRI) :
[*]            if s is server :
[*]                #如果socket是监听的server代表有新连接
[*]                connection , client_address = s.accept()
[*]                print "新连接:" , client_address
[*]                connection.setblocking(False)
[*]
[*]                fd_to_socket[connection.fileno()] = connection
[*]                #加入到等待读事件集合
[*]                poller.register(connection,READ_ONLY)
[*]                message_queues[connection] = Queue.Queue()
[*]            else :
[*]                #接收客户端发送的数据
[*]                data = s.recv(1024)
[*]                if data:
[*]                  print "收到数据:" , data , "客户端:" , s.getpeername()
[*]                  message_queues[s].put(data)
[*]                  #修改读取到消息的连接到等待写事件集合
[*]                  poller.modify(s,READ_WRITE)
[*]                else :
[*]                  # Close the connection
[*]                  print "closing" , s.getpeername()
[*]                  # Stop listening for input on the connection
[*]                  poller.unregister(s)
[*]                  s.close()
[*]                  del message_queues[s]
[*]      #连接关闭事件
[*]      elif flag & select.POLLHUP :
[*]            print " Closing ", s.getpeername() ,"(HUP)"
[*]            poller.unregister(s)
[*]            s.close()
[*]      #可写事件
[*]      elif flag & select.POLLOUT :
[*]            try:
[*]                msg = message_queues[s].get_nowait()
[*]            except Queue.Empty:
[*]                print s.getpeername() , " queue empty"
[*]                poller.modify(s,READ_ONLY)
[*]            else :
[*]                print "发送数据:" , data , "客户端:" , s.getpeername()
[*]                s.send(msg)
[*]      #异常事件
[*]      elif flag & select.POLLERR:
[*]            print "exception on" , s.getpeername()
[*]            poller.unregister(s)
[*]            s.close()
[*]            del message_queues[s]

Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:




[*]#!/usr/bin/python

[*]# -*- coding: utf-8 -*-
[*]import socket, select
[*]import Queue
[*]
[*]serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
[*]serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
[*]server_address = ("192.168.1.5", 8080)
[*]serversocket.bind(server_address)
[*]serversocket.listen(1)
[*]print "服务器启动成功,监听IP:" , server_address
[*]serversocket.setblocking(0)
[*]timeout = 10
[*]#新建epoll事件对象,后续要监控的事件添加到其中
[*]epoll = select.epoll()
[*]#添加服务器监听fd到等待读事件集合
[*]epoll.register(serversocket.fileno(), select.EPOLLIN)
[*]message_queues = {}
[*]
[*]fd_to_socket = {serversocket.fileno():serversocket,}
[*]while True:
[*]print "等待活动连接......"
[*]#轮询注册的事件集合
[*]events = epoll.poll(timeout)
[*]if not events:
[*]   print "epoll超时无活动连接,重新轮询......"
[*]   continue
[*]print "有" , len(events), "个新事件,开始处理......"
[*]for fd, event in events:
[*]   socket = fd_to_socket[fd]
[*]   #可读事件
[*]   if event & select.EPOLLIN:
[*]         #如果活动socket为服务器所监听,有新连接
[*]         if socket == serversocket:
[*]            connection, address = serversocket.accept()
[*]            print "新连接:" , address
[*]            connection.setblocking(0)
[*]            #注册新连接fd到待读事件集合
[*]            epoll.register(connection.fileno(), select.EPOLLIN)
[*]            fd_to_socket[connection.fileno()] = connection
[*]            message_queues[connection] = Queue.Queue()
[*]         #否则为客户端发送的数据
[*]         else:
[*]            data = socket.recv(1024)
[*]            if data:
[*]               print "收到数据:" , data , "客户端:" , socket.getpeername()
[*]               message_queues[socket].put(data)
[*]               #修改读取到消息的连接到等待写事件集合
[*]               epoll.modify(fd, select.EPOLLOUT)
[*]   #可写事件
[*]   elif event & select.EPOLLOUT:
[*]      try:
[*]         msg = message_queues[socket].get_nowait()
[*]      except Queue.Empty:
[*]         print socket.getpeername() , " queue empty"
[*]         epoll.modify(fd, select.EPOLLIN)
[*]      else :
[*]         print "发送数据:" , data , "客户端:" , socket.getpeername()
[*]         socket.send(msg)
[*]   #关闭事件
[*]   elif event & select.EPOLLHUP:
[*]      epoll.unregister(fd)
[*]      fd_to_socket[fd].close()
[*]      del fd_to_socket[fd]
[*]epoll.unregister(serversocket.fileno())
[*]epoll.close()
[*]serversocket.close()
原文链接


页: [1]
查看完整版本: Python异步非阻塞IO多路复用Select/Poll/Epoll使用