设为首页 收藏本站
查看: 1207|回复: 0

[经验分享] python Asyncore.dispatcher 理解

[复制链接]
累计签到:4 天
连续签到:1 天
发表于 2015-11-30 12:56:28 | 显示全部楼层 |阅读模式
1、Asyncore是python的标准库。Asyncore.dispatcher 是这个库中的一个socket的框架,为socket添加了一些通用的回调方法,比如:

def listen(self, num):

def bind(self, addr):

def connect(self, address):

def accept(self):

def send(self, data):

def recv(self, buffer_size):

def close(self):
def handle_read(self):
self.log_info('unhandled read event', 'warning')
def handle_write(self):
self.log_info('unhandled write event', 'warning')
def handle_connect(self):
self.log_info('unhandled connect event', 'warning')
def handle_accept(self):
self.log_info('unhandled accept event', 'warning')
def handle_close(self):
self.log_info('unhandled close event', 'warning')
self.close()

  
  2、Asyncore.dispatcher 对socket进行了一次封装之后,这个socket就可以添加到一个全局的字典中,Asyncore模块中的loop函数会对这些socket文件描述符进行监视,具体可能是select或者poll方法实现,windows上面是poll实现的。通过I/O多路复用实现同时处理多个请求,避免使用多线程或者多进程的方式。下面中loop中传入的map就是socket-dispatcher的对象集合,当发现对应的socket有可读可写的请求时,调用对应的handle_read 和handle_write方法。



def poll(timeout=0.0, map=None):
if map is None:
map = socket_map
if map:
r = []; w = []; e = []
for fd, obj in map.items():
is_r = obj.readable()
is_w = obj.writable()
if is_r:
r.append(fd)
# accepting sockets should not be writable
if is_w and not obj.accepting:
w.append(fd)
if is_r or is_w:
e.append(fd)
if [] == r == w == e:
time.sleep(timeout)
return
try:
r, w, e = select.select(r, w, e, timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
else:
return
for fd in r:
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in w:
obj = map.get(fd)
if obj is None:
continue
write(obj)
for fd in e:
obj = map.get(fd)
if obj is None:
continue
_exception(obj)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in map.items():
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
# Only check for exceptions if object was either readable
# or writable.
flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL
pollster.register(fd, flags)
try:
r = pollster.poll(timeout)
except select.error, err:
if err.args[0] != EINTR:
raise
r = []
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
poll3 = poll2                           # Alias for backward compatibility
def loop(timeout=30.0, use_poll=False, map=None, count=None):
if map is None:
map = socket_map
if use_poll and hasattr(select, 'poll'):
poll_fun = poll2
else:
poll_fun = poll
if count is None:
while map:
poll_fun(timeout, map)
else:
while map and count > 0:
poll_fun(timeout, map)
count = count – 1  
  3、如何实现一个异步的服务器,官方给出了一个比较好的代码:



import logging
import asyncore
import socket
logging.basicConfig(level=logging.DEBUG, format="%(created)-15s %(msecs)d %(levelname)8s %(thread)d %(name)s %(message)s")
log                     = logging.getLogger(__name__)
BACKLOG                 = 5
SIZE                    = 1024
class EchoHandler(asyncore.dispatcher):
def __init__(self, conn_sock, client_address, server):
self.server             = server
self.client_address     = client_address
self.buffer             = ""
# We dont have anything to write, to start with
self.is_writable        = False
# Create ourselves, but with an already provided socket
asyncore.dispatcher.__init__(self, conn_sock)
log.debug("created handler; waiting for loop")
def readable(self):
return True     # We are always happy to read
def writable(self):
return self.is_writable # But we might not have
# anything to send all the time
def handle_read(self):
log.debug("handle_read")
data = self.recv(SIZE)
log.debug("after recv")
if data:
log.debug("got data")
self.buffer += data
self.is_writable = True  # sth to send back now
else:
log.debug("got null data")
def handle_write(self):
log.debug("handle_write")
if self.buffer:
sent = self.send(self.buffer)
log.debug("sent data")
self.buffer = self.buffer[sent:]
else:
log.debug("nothing to send")
if len(self.buffer) == 0:
self.is_writable = False
# Will this ever get called?  Does loop() call
# handle_close() if we called close, to start with?
def handle_close(self):
log.debug("handle_close")
log.info("conn_closed: client_address=%s:%s" % \
(self.client_address[0],
self.client_address[1]))
self.close()
#pass
class EchoServer(asyncore.dispatcher):
allow_reuse_address         = False
request_queue_size          = 5
address_family              = socket.AF_INET
socket_type                 = socket.SOCK_STREAM
def __init__(self, address, handlerClass=EchoHandler):
self.address            = address
self.handlerClass       = handlerClass
asyncore.dispatcher.__init__(self)
self.create_socket(self.address_family,
self.socket_type)
if self.allow_reuse_address:
self.set_reuse_addr()
self.server_bind()
self.server_activate()
def server_bind(self):
self.bind(self.address)
log.debug("bind: address=%s:%s" % (self.address[0], self.address[1]))
def server_activate(self):
self.listen(self.request_queue_size)
log.debug("listen: backlog=%d" % self.request_queue_size)
def fileno(self):
return self.socket.fileno()
def serve_forever(self):
asyncore.loop()
# TODO: try to implement handle_request()
# Internal use
def handle_accept(self):
(conn_sock, client_address) = self.accept()
if self.verify_request(conn_sock, client_address):
self.process_request(conn_sock, client_address)
def verify_request(self, conn_sock, client_address):
return True
def process_request(self, conn_sock, client_address):
log.info("conn_made: client_address=%s:%s" % \
(client_address[0],
client_address[1]))
self.handlerClass(conn_sock, client_address, self)
def handle_close(self):
self.close()  
  使用示例:


interface = "0.0.0.0"
port = 8080
server = asyncore_echo_server.EchoServer((interface, port))
server.serve_forever()  
  代码中创建了两个dispatcher的子类,一个是server专门用于接受客户端的请求;一个是EchoHandler,用于收到请求的时候处理客户端的每一个请求。
  
  3、Asyncore.dispatcher 子类中的server和普通的Handler有什么区别?

  区别只有一个地方,dispatcher初始化的时候,server会先创建一个socket,binding本地的地址和端口,然后listening,在listening的过程中会执行下列代码


def listen(self, num):
self.accepting = True
if os.name == 'nt' and num > 5:
num = 5
return self.socket.listen(num)

  当self.accepting = True 之后,客户端所有的请求会当成accept处理,也就是收到一个请求,得到一个socket。如果这个时候不把这个socket传给handler,那么这个socket因为没有人保存会立即释放,导致socket关闭。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-145381-1-1.html 上篇帖子: 【Python爬虫学习笔记(1)】urllib2库相关知识点总结 下篇帖子: python用法笔记(数组(list、touple、dict)、字符串)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表