|
有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- import select
- import socket
- import Queue
- server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- server.setblocking(False)
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR , 1)
- server_address= ('192.168.1.5',8080)
- server.bind(server_address)
- server.listen(10)
- #select轮询等待读socket集合
- inputs = [server]
- #select轮询等待写socket集合
- outputs = []
- message_queues = {}
- #select超时时间
- timeout = 20
- while True:
- print "等待活动连接......"
- readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
- if not (readable or writable or exceptional) :
- print "select超时无活动连接,重新select...... "
- continue;
- #循环可读事件
- for s in readable :
- #如果是server监听的socket
- if s is server:
- #同意连接
- connection, client_address = s.accept()
- print "新连接: ", client_address
- connection.setblocking(0)
- #将连接加入到select可读事件队列
- inputs.append(connection)
- #新建连接为key的字典,写回读取到的消息
- message_queues[connection] = Queue.Queue()
- else:
- #不是本机监听就是客户端发来的消息
- data = s.recv(1024)
- if data :
- print "收到数据:" , data , "客户端:",s.getpeername()
- message_queues[s].put(data)
- if s not in outputs:
- #将读取到的socket加入到可写事件队列
- outputs.append(s)
- else:
- #空白消息,关闭连接
- print "关闭连接:", client_address
- if s in outputs :
- outputs.remove(s)
- inputs.remove(s)
- s.close()
- del message_queues[s]
- for s in writable:
- try:
- msg = message_queues[s].get_nowait()
- except Queue.Empty:
- print "连接:" , s.getpeername() , '消息队列为空'
- outputs.remove(s)
- else:
- print "发送数据:" , msg , "到", s.getpeername()
- s.send(msg)
- for s in exceptional:
- print "异常连接:", s.getpeername()
- inputs.remove(s)
- if s in outputs:
- outputs.remove(s)
- s.close()
- del message_queues[s]
Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- import socket
- import select
- import Queue
- server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server.setblocking(False)
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_address = ("192.168.1.5", 8080)
- server.bind(server_address)
- server.listen(5)
- print "服务器启动成功,监听IP:" , server_address
- message_queues = {}
- #超时,毫秒
- timeout = 5000
- #监听哪些事件
- READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
- READ_WRITE = (READ_ONLY|select.POLLOUT)
- #新建轮询事件对象
- poller = select.poll()
- #注册本机监听socket到等待可读事件事件集合
- poller.register(server,READ_ONLY)
- #文件描述符到socket映射
- fd_to_socket = {server.fileno():server,}
- while True:
- print "等待活动连接......"
- #轮询注册的事件集合
- events = poller.poll(timeout)
- if not events:
- print "poll超时,无活动连接,重新poll......"
- continue
- print "有" , len(events), "个新事件,开始处理......"
- for fd ,flag in events:
- s = fd_to_socket[fd]
- #可读事件
- if flag & (select.POLLIN | select.POLLPRI) :
- if s is server :
- #如果socket是监听的server代表有新连接
- connection , client_address = s.accept()
- print "新连接:" , client_address
- connection.setblocking(False)
- fd_to_socket[connection.fileno()] = connection
- #加入到等待读事件集合
- poller.register(connection,READ_ONLY)
- message_queues[connection] = Queue.Queue()
- else :
- #接收客户端发送的数据
- data = s.recv(1024)
- if data:
- print "收到数据:" , data , "客户端:" , s.getpeername()
- message_queues[s].put(data)
- #修改读取到消息的连接到等待写事件集合
- poller.modify(s,READ_WRITE)
- else :
- # Close the connection
- print " closing" , s.getpeername()
- # Stop listening for input on the connection
- poller.unregister(s)
- s.close()
- del message_queues[s]
- #连接关闭事件
- elif flag & select.POLLHUP :
- print " Closing ", s.getpeername() ,"(HUP)"
- poller.unregister(s)
- s.close()
- #可写事件
- elif flag & select.POLLOUT :
- try:
- msg = message_queues[s].get_nowait()
- except Queue.Empty:
- print s.getpeername() , " queue empty"
- poller.modify(s,READ_ONLY)
- else :
- print "发送数据:" , data , "客户端:" , s.getpeername()
- s.send(msg)
- #异常事件
- elif flag & select.POLLERR:
- print " exception on" , s.getpeername()
- poller.unregister(s)
- s.close()
- del message_queues[s]
Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- import socket, select
- import Queue
- serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_address = ("192.168.1.5", 8080)
- serversocket.bind(server_address)
- serversocket.listen(1)
- print "服务器启动成功,监听IP:" , server_address
- serversocket.setblocking(0)
- timeout = 10
- #新建epoll事件对象,后续要监控的事件添加到其中
- epoll = select.epoll()
- #添加服务器监听fd到等待读事件集合
- epoll.register(serversocket.fileno(), select.EPOLLIN)
- message_queues = {}
- fd_to_socket = {serversocket.fileno():serversocket,}
- while True:
- print "等待活动连接......"
- #轮询注册的事件集合
- events = epoll.poll(timeout)
- if not events:
- print "epoll超时无活动连接,重新轮询......"
- continue
- print "有" , len(events), "个新事件,开始处理......"
- for fd, event in events:
- socket = fd_to_socket[fd]
- #可读事件
- if event & select.EPOLLIN:
- #如果活动socket为服务器所监听,有新连接
- if socket == serversocket:
- connection, address = serversocket.accept()
- print "新连接:" , address
- connection.setblocking(0)
- #注册新连接fd到待读事件集合
- epoll.register(connection.fileno(), select.EPOLLIN)
- fd_to_socket[connection.fileno()] = connection
- message_queues[connection] = Queue.Queue()
- #否则为客户端发送的数据
- else:
- data = socket.recv(1024)
- if data:
- print "收到数据:" , data , "客户端:" , socket.getpeername()
- message_queues[socket].put(data)
- #修改读取到消息的连接到等待写事件集合
- epoll.modify(fd, select.EPOLLOUT)
- #可写事件
- elif event & select.EPOLLOUT:
- try:
- msg = message_queues[socket].get_nowait()
- except Queue.Empty:
- print socket.getpeername() , " queue empty"
- epoll.modify(fd, select.EPOLLIN)
- else :
- print "发送数据:" , data , "客户端:" , socket.getpeername()
- socket.send(msg)
- #关闭事件
- elif event & select.EPOLLHUP:
- epoll.unregister(fd)
- fd_to_socket[fd].close()
- del fd_to_socket[fd]
- epoll.unregister(serversocket.fileno())
- epoll.close()
- serversocket.close()
原文链接
|
|