Python异步非阻塞IO多路复用Select/Poll/Epoll使用
有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:
[*]#!/usr/bin/python
[*]# -*- coding: utf-8 -*-
[*]import select
[*]import socket
[*]import Queue
[*]
[*]server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
[*]server.setblocking(False)
[*]server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
[*]server_address= ('192.168.1.5',8080)
[*]server.bind(server_address)
[*]server.listen(10)
[*]
[*]#select轮询等待读socket集合
[*]inputs = [server]
[*]#select轮询等待写socket集合
[*]outputs = []
[*]message_queues = {}
[*]#select超时时间
[*]timeout = 20
[*]
[*]while True:
[*] print "等待活动连接......"
[*] readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
[*]
[*] if not (readable or writable or exceptional) :
[*] print "select超时无活动连接,重新select...... "
[*] continue;
[*] #循环可读事件
[*] for s in readable :
[*] #如果是server监听的socket
[*] if s is server:
[*] #同意连接
[*] connection, client_address = s.accept()
[*] print "新连接: ", client_address
[*] connection.setblocking(0)
[*] #将连接加入到select可读事件队列
[*] inputs.append(connection)
[*] #新建连接为key的字典,写回读取到的消息
[*] message_queues[connection] = Queue.Queue()
[*] else:
[*] #不是本机监听就是客户端发来的消息
[*] data = s.recv(1024)
[*] if data :
[*] print "收到数据:" , data , "客户端:",s.getpeername()
[*] message_queues[s].put(data)
[*] if s not in outputs:
[*] #将读取到的socket加入到可写事件队列
[*] outputs.append(s)
[*] else:
[*] #空白消息,关闭连接
[*] print "关闭连接:", client_address
[*] if s in outputs :
[*] outputs.remove(s)
[*] inputs.remove(s)
[*] s.close()
[*] del message_queues[s]
[*] for s in writable:
[*] try:
[*] msg = message_queues[s].get_nowait()
[*] except Queue.Empty:
[*] print "连接:" , s.getpeername() , '消息队列为空'
[*] outputs.remove(s)
[*] else:
[*] print "发送数据:" , msg , "到", s.getpeername()
[*] s.send(msg)
[*]
[*] for s in exceptional:
[*] print "异常连接:", s.getpeername()
[*] inputs.remove(s)
[*] if s in outputs:
[*] outputs.remove(s)
[*] s.close()
[*] del message_queues[s]
Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:
[*]#!/usr/bin/python
[*]# -*- coding: utf-8 -*-
[*]import socket
[*]import select
[*]import Queue
[*]
[*]server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
[*]server.setblocking(False)
[*]server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
[*]server_address = ("192.168.1.5", 8080)
[*]server.bind(server_address)
[*]server.listen(5)
[*]print "服务器启动成功,监听IP:" , server_address
[*]message_queues = {}
[*]#超时,毫秒
[*]timeout = 5000
[*]#监听哪些事件
[*]READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
[*]READ_WRITE = (READ_ONLY|select.POLLOUT)
[*]#新建轮询事件对象
[*]poller = select.poll()
[*]#注册本机监听socket到等待可读事件事件集合
[*]poller.register(server,READ_ONLY)
[*]#文件描述符到socket映射
[*]fd_to_socket = {server.fileno():server,}
[*]while True:
[*] print "等待活动连接......"
[*] #轮询注册的事件集合
[*] events = poller.poll(timeout)
[*] if not events:
[*] print "poll超时,无活动连接,重新poll......"
[*] continue
[*] print "有" , len(events), "个新事件,开始处理......"
[*] for fd ,flag in events:
[*] s = fd_to_socket[fd]
[*] #可读事件
[*] if flag & (select.POLLIN | select.POLLPRI) :
[*] if s is server :
[*] #如果socket是监听的server代表有新连接
[*] connection , client_address = s.accept()
[*] print "新连接:" , client_address
[*] connection.setblocking(False)
[*]
[*] fd_to_socket[connection.fileno()] = connection
[*] #加入到等待读事件集合
[*] poller.register(connection,READ_ONLY)
[*] message_queues[connection] = Queue.Queue()
[*] else :
[*] #接收客户端发送的数据
[*] data = s.recv(1024)
[*] if data:
[*] print "收到数据:" , data , "客户端:" , s.getpeername()
[*] message_queues[s].put(data)
[*] #修改读取到消息的连接到等待写事件集合
[*] poller.modify(s,READ_WRITE)
[*] else :
[*] # Close the connection
[*] print "closing" , s.getpeername()
[*] # Stop listening for input on the connection
[*] poller.unregister(s)
[*] s.close()
[*] del message_queues[s]
[*] #连接关闭事件
[*] elif flag & select.POLLHUP :
[*] print " Closing ", s.getpeername() ,"(HUP)"
[*] poller.unregister(s)
[*] s.close()
[*] #可写事件
[*] elif flag & select.POLLOUT :
[*] try:
[*] msg = message_queues[s].get_nowait()
[*] except Queue.Empty:
[*] print s.getpeername() , " queue empty"
[*] poller.modify(s,READ_ONLY)
[*] else :
[*] print "发送数据:" , data , "客户端:" , s.getpeername()
[*] s.send(msg)
[*] #异常事件
[*] elif flag & select.POLLERR:
[*] print "exception on" , s.getpeername()
[*] poller.unregister(s)
[*] s.close()
[*] del message_queues[s]
Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:
[*]#!/usr/bin/python
[*]# -*- coding: utf-8 -*-
[*]import socket, select
[*]import Queue
[*]
[*]serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
[*]serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
[*]server_address = ("192.168.1.5", 8080)
[*]serversocket.bind(server_address)
[*]serversocket.listen(1)
[*]print "服务器启动成功,监听IP:" , server_address
[*]serversocket.setblocking(0)
[*]timeout = 10
[*]#新建epoll事件对象,后续要监控的事件添加到其中
[*]epoll = select.epoll()
[*]#添加服务器监听fd到等待读事件集合
[*]epoll.register(serversocket.fileno(), select.EPOLLIN)
[*]message_queues = {}
[*]
[*]fd_to_socket = {serversocket.fileno():serversocket,}
[*]while True:
[*]print "等待活动连接......"
[*]#轮询注册的事件集合
[*]events = epoll.poll(timeout)
[*]if not events:
[*] print "epoll超时无活动连接,重新轮询......"
[*] continue
[*]print "有" , len(events), "个新事件,开始处理......"
[*]for fd, event in events:
[*] socket = fd_to_socket[fd]
[*] #可读事件
[*] if event & select.EPOLLIN:
[*] #如果活动socket为服务器所监听,有新连接
[*] if socket == serversocket:
[*] connection, address = serversocket.accept()
[*] print "新连接:" , address
[*] connection.setblocking(0)
[*] #注册新连接fd到待读事件集合
[*] epoll.register(connection.fileno(), select.EPOLLIN)
[*] fd_to_socket[connection.fileno()] = connection
[*] message_queues[connection] = Queue.Queue()
[*] #否则为客户端发送的数据
[*] else:
[*] data = socket.recv(1024)
[*] if data:
[*] print "收到数据:" , data , "客户端:" , socket.getpeername()
[*] message_queues[socket].put(data)
[*] #修改读取到消息的连接到等待写事件集合
[*] epoll.modify(fd, select.EPOLLOUT)
[*] #可写事件
[*] elif event & select.EPOLLOUT:
[*] try:
[*] msg = message_queues[socket].get_nowait()
[*] except Queue.Empty:
[*] print socket.getpeername() , " queue empty"
[*] epoll.modify(fd, select.EPOLLIN)
[*] else :
[*] print "发送数据:" , data , "客户端:" , socket.getpeername()
[*] socket.send(msg)
[*] #关闭事件
[*] elif event & select.EPOLLHUP:
[*] epoll.unregister(fd)
[*] fd_to_socket[fd].close()
[*] del fd_to_socket[fd]
[*]epoll.unregister(serversocket.fileno())
[*]epoll.close()
[*]serversocket.close()
原文链接
页:
[1]