设为首页 收藏本站
查看: 463|回复: 0

[经验分享] Async I/O and Python 在Python中的异步IO (2)

[复制链接]

尚未签到

发表于 2018-8-12 13:11:59 | 显示全部楼层 |阅读模式
  此文翻译自 Mark McLoughlin 的 博客
  Async I/O and Python
  原文:http://blogs.gnome.org/markmc/2013/06/04/async-io-and-python
Eventlet
  Ok, so how about eventlet? Presumably eventlet makes it a lot easier to implement non-blocking I/O than the above example? Here’s what it looks like with eventlet:
  使用eventlet可以更容易实现非阻塞式IO:
from eventlet.green import socket  

  
sock = socket.socket()
  
sock.connect(('localhost', 1234))
  
sock.send('foo\n' * 10 * 1024 * 1024)
  Yes, that does look very like the first example. What has happened here is that by creating the socket using eventlet.green.socket.socket() we have put the socket into non-blocking mode and when the write to the socket blocks, eventlet will schedule any other work that might be pending. Hitting Ctrl-C while this
  is running is actually pretty instructive:
  使用eventlet.green.socket.socket()启用非阻塞模式向阻塞的sokcet写数据时,eventlet可以安排其他工作运行, 按Ctrl+C更有启发式地运行。
$> python test-eventlet-write.py  
^CTraceback (most recent call last):
  
  File "test-eventlet-write.py", line 6, in
  
    sock.send('foo\n' * 10 * 1024 * 1024)
  
  File ".../eventlet/greenio.py", line 289, in send
  
    timeout_exc=socket.timeout("timed out"))
  
  File ".../eventlet/hubs/__init__.py", line 121, in trampoline
  
    return hub.switch()
  
  File ".../eventlet/hubs/hub.py", line 187, in switch
  
    return self.greenlet.switch()
  
  File ".../eventlet/hubs/hub.py", line 236, in run
  
    self.wait(sleep_time)
  
  File ".../eventlet/hubs/poll.py", line 84, in wait
  
    presult = self.do_poll(seconds)
  
  File ".../eventlet/hubs/epolls.py", line 61, in do_poll
  
    return self.poll.poll(seconds)
  
KeyboardInterrupt
  Yes, indeed, there’s a whole lot going on behind that innocuous looking send() call. You see mention of a ‘hub’ which is eventlet’s name for an event loop. You also see this trampoline() call which means “put the current code to sleep until the socket is writable”. And, there at the very end, we’re still sleeping in a call to poll() which is basically the same thing as select().
  事实上。在send()后还有更多隐情,妳会注意有一个“集线器”,是一个名字是eventlet的事件循环。妳也可以看到一个trampoline()调用,作用是“使当前代码休眠,直至socket可写”。并且,在很后面的地方,我们仍然在一个poll()调用中执行休眠,类似于在select()中的实现。
  To show the example of doing some “useful” work rather than sleeping all the time we run a busy loop greenthread:
  为展示可以做些“有用的”工作,而不是一直休眠,我们可以运行一个busy_loop的greenthread循环。
import eventlet  
from eventlet.green import socket
  

  
def busy_loop():
  
    while True:
  
        i = 0
  
        while i < 5000000:
  
            i += 1
  
        print "yielding"
  
        eventlet.sleep()
  
eventlet.spawn(busy_loop)
  

  
sock = socket.socket()
  
sock.connect(('localhost', 1234))
  
sock.send('foo\n' * 10 * 1024 * 1024)
  Now every time the socket isn’t writable, we switch to the busy_loop() greenthread and do some work. Greenthreads must cooperatively yield to one another so we call eventlet.sleep() in busy_loop() to once again poll the socket to see if its writable. Again, if we use the ‘time’ command to run this:
  每次当socket无法写入,我们切换到名为busy_loop()的greenthread,完成一些工作。Greenthreads必须通过yield迭代方式和其他进程合作,这样我们可以在busy_loop()调用eventlet.sleep()一次再次 poll到socket检测其是否可写。接下来,如果我们使用‘time’命令再次运行这个示例:
$> time python ./test-eventlet-write.py  
yielding
  
yielding
  
yielding
  
...
  
real    0m5.386s
  
user    0m5.081s
  
sys     0m0.088s
  you can see we’re spending very little time sleeping.
  妳就可以看到休眠花费了很少的时间。
  (As an aside, I was going to take a look at gevent, but it doesn’t seem fundamentally different from eventlet. Am I wrong?)
Twisted
  Long, long ago, in times of old, Nova switched from twisted to eventlet so it makes sense to take a quick look at twisted:
  很久以前,旧石器时代,Nova switched from twisted to eventlet ,我们很有必要快速阅览一下twisted:
from twisted.internet import protocol  
from twisted.internet import reactor
  

  
class Test(protocol.Protocol):
  
    def connectionMade(self):
  
        self.transport.write('foo\n' * 2 * 1024 * 1024)
  

  
class TestClientFactory(protocol.ClientFactory):
  
    def buildProtocol(self, addr):
  
        return Test()
  

  
reactor.connectTCP('localhost', 1234, TestClientFactory())
  
reactor.run()
  What complicates the example most is twisted protocol abstraction which we need to use simply to write to the socket. The ‘reactor’ abstraction is simply twisted’s name for an event loop. So, we create a on-blocking socket, block in the event loop (using e.g. select()) until the connection completes and then
  write to the socket. The transport.write() call will actually queue a writer in the reactor, return immediately and whenever the socket is writable, the writer will continue its work.
  To show how you can run something in parallel, here’s how to run some code in a deferred callback:
def busy_loop():  
    i = 0
  
    while i < 5000000:
  
        i += 1
  
    reactor.callLater(0, busy_loop)
  

  
reactor.connectTCP(...)
  
reactor.callLater(0, busy_loop)
  
reactor.run()
  I’m using a timeout of zero here and it shows up a weakness in both twisted and eventlet – we want this busy_loop() code to only run when the socket isn’t writeable. In other words, we want the task to have a lower priority than the writer task. In both twisted and eventlet, the timed tasks are run before the
  I/O tasks and there is no way to add a task which is only run if there are no runnable I/O tasks.
GLib
  My introduction to async I/O was back when I was working on GNOME (beginning with GNOME’s CORBA ORB, called ORBit) so I can’t help comparing the above abstractions to GLib’s main loop. Here’s some equivalent code:
/* build with gcc -g -O0 -Wall $(pkg-config --libs --cflags glib-2.0) test-glib-write.c -o test-glib-write */  

  
#include <errno.h>
  
#include <fcntl.h>
  
#include <stdio.h>
  
#include <string.h>
  
#include <unistd.h>
  
#include <sys/types.h>
  
#include <sys/socket.h>
  
#include <netinet/in.h>
  

  
#include <glib.h>
  

  
GMainLoop    *main_loop = NULL;
  
static gchar *strv[10 * 1024 * 1024];
  
static gchar *data = NULL;
  
int           remaining = -1;
  

  
static gboolean
  
socket_writable(GIOChannel   *source,
  
                GIOCondition  condition,
  
                gpointer      user_data)
  
{
  
  int fd, sent;
  

  
  fd = g_io_channel_unix_get_fd(source);
  
  do
  
    {
  
      sent = write(fd, data, remaining);
  
      if (sent == -1)
  
        {
  
          if (errno != EAGAIN)
  
            {
  
              fprintf(stderr, "Write error: %s\n", strerror(errno));
  
              goto finished;
  
            }
  
          return TRUE;
  
        }
  

  
      data = &data[sent];
  
      remaining -= sent;
  
    }
  
  while (sent > 0 && remaining > 0);
  

  
  if (remaining <= 0)
  
    goto finished;
  

  
  return TRUE;
  

  
finished:
  
  g_main_loop_quit(main_loop);
  
  return FALSE;
  
}
  

  
static gboolean
  
busy_loop(gpointer data)
  
{
  
  int i = 0;
  
  while (i < 5000000)
  
    i += 1;
  
  return TRUE;
  
}
  

  
int
  
main(int argc, char **argv)
  
{
  
  GIOChannel         *io_channel;
  
  guint               io_watch;
  
  int                 fd;
  
  struct sockaddr_in  addr;
  
  int                 i;
  
  gchar              *to_free;
  

  
  for (i = 0; i < G_N_ELEMENTS(strv)-1; i++)
  
    strv = "foo\n";
  
  strv[G_N_ELEMENTS(strv)-1] = NULL;
  

  
  data = to_free = g_strjoinv(NULL, strv);
  
  remaining = strlen(data);
  

  
  fd = socket(AF_INET, SOCK_STREAM, 0);
  

  
  memset(&addr, 0, sizeof(struct sockaddr_in));
  
  addr.sin_family      = AF_INET;
  
  addr.sin_port        = htons(1234);
  
  addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
  

  
  if (connect(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1)
  
    {
  
      fprintf(stderr, "Error connecting to server: %s\n", strerror(errno));
  
      return 1;
  
    }
  

  
  fcntl(fd, F_SETFL, O_NONBLOCK);
  

  
  io_channel = g_io_channel_unix_new(fd);
  
  io_watch = g_io_add_watch(io_channel,
  
                            G_IO_OUT,
  
                            (GIOFunc)socket_writable,
  
                            GINT_TO_POINTER(fd));
  

  
  g_idle_add(busy_loop, NULL);
  

  
  main_loop = g_main_loop_new(NULL, FALSE);
  

  
  g_main_loop_run(main_loop);
  
  g_main_loop_unref(main_loop);
  

  
  g_source_remove(io_watch);
  
  g_io_channel_unref(io_channel);
  

  
  close(fd);
  

  
  g_free(to_free);
  

  
  return 0;
  
}
  Here I create a non-blocking socket, set up an ‘I/O watch’ to tell me when the socket is writable and, when it is, I keep blasting data into the socket until I get an EAGAIN. This is the point at which write() would block if it was a blocking socket and I return TRUE from the callback to say “call me again when the socket is writable”. Only when I’ve finished writing all of the data do I return FALSE and quit the main loop causing the g_main_loop_run() call to return.
  The point about task priorities is illustrated nicely here. GLib does have the concept of priorities and has a “idle callback” facility you can use to run some code when no higher priority task is waiting to run. In this case, the busy_loop() function will *only* run when the socket is not writable.
Tulip
  There’s a lot of talk lately about Guido’s Asynchronous IO Support Rebooted (PEP3156) efforts so, of course, we’ve got to have a look at that.
  One interesting aspect of this effort is that it aims to support both the coroutine and callbacks style programming models. We’ll try out both models below.
  Tulip, of course, has an event loop, time-based callbacks, I/O callbacks and I/O helper functions. We can build a simple variant of our non-blocking I/O example above using tulip’s event loop and I/O callback:
import errno  
import select
  
import socket
  

  
import tulip
  

  
sock = socket.socket()
  
sock.connect(('localhost', 1234))
  
sock.setblocking(0)
  

  
buf = memoryview(str.encode('foo\n' * 2 * 1024 * 1024))
  
def do_write():
  
    global buf
  
    while True:
  
        try:
  
            buf = buf[sock.send(buf):]
  
        except socket.error as e:
  
            if e.errno != errno.EAGAIN:
  
                raise e
  
            return
  

  
def busy_loop():
  
    i = 0
  
    while i < 5000000:
  
        i += 1
  
    event_loop.call_soon(busy_loop)
  

  
event_loop = tulip.get_event_loop()
  
event_loop.add_writer(sock, do_write)
  
event_loop.call_soon(busy_loop)
  
event_loop.run_forever()
  We can go a step further and use tulip’s Protocol abstraction and connection helper:
import errno  
import select
  
import socket
  

  
import tulip
  

  
class Protocol(tulip.Protocol):
  

  
    buf = b'foo\n' * 10 * 1024 * 1024
  

  
    def connection_made(self, transport):
  
        event_loop.call_soon(busy_loop)
  
        transport.write(self.buf)
  
        transport.close()
  

  
    def connection_lost(self, exc):
  
        event_loop.stop()
  

  
def busy_loop():
  
    i = 0
  
    while i < 5000000:
  
        i += 1
  
    event_loop.call_soon(busy_loop)
  

  
event_loop = tulip.get_event_loop()
  
tulip.Task(event_loop.create_connection(Protocol, 'localhost', 1234))
  
event_loop.run_forever()
  This is pretty similar to the twisted example and shows up yet another example of the lack of task prioritization being an issue. If we added the busy loop to the event loop before the connection completed, the scheduler would run the busy loop every time the connection task yields.
Coroutines, Generators and Subgenerators
  Under the hood, tulip depends heavily on generators to implement coroutines. It’s worth digging into that concept a bit to understand what’s going on.
  Firstly, remind yourself how a generator works:
def gen():  
    i = 0
  
    while i < 2:
  
        print(i)
  
        yield
  
        i += 1
  

  
i = gen()
  
print("yo!")
  
next(i)
  
print("hello!")
  
next(i)
  
print("bye!")
  
try:
  
    next(i)
  
except StopIteration:
  
    print("stopped")
  This will print:
yo!  
0
  
hello!
  
1
  
bye!
  
stopped

  Now imagine a generator function which writes to a non-blocking socket and calls yield every time the write would block. You have the beginnings of coroutine based async I/O. To flesh out the>import collections  
import errno
  
import select
  
import socket
  

  
sock = socket.socket()
  
sock.connect(('localhost', 1234))
  
sock.setblocking(0)
  

  
def busy_loop():
  
    while True:
  
        i = 0
  
        while i < 5000000:
  
            i += 1
  
        yield
  

  
def write():
  
    buf = memoryview(b'foo\n' * 2 * 1024 * 1024)
  
    while len(buf):
  
        try:
  
            buf = buf[sock.send(buf):]
  
        except socket.error as e:
  
            if e.errno != errno.EAGAIN:
  
                raise e
  
            yield
  
    quit()
  

  
Task = collections.namedtuple('Task', ['generator', 'wfd', 'idle'])
  

  
tasks = [
  
    Task(busy_loop(), wfd=None, idle=True),
  
    Task(write(), wfd=sock, idle=False)
  
]
  

  
running = True
  

  
def quit():
  
    global running
  
    running = False
  

  
while running:
  
    finished = []
  
    for n, t in enumerate(tasks):
  
        try:
  
            next(t.generator)
  
        except StopIteration:
  
            finished.append(n)
  
    map(tasks.pop, finished)
  

  
    wfds = [t.wfd for t in tasks if t.wfd]
  
    timeout = 0 if [t for t in tasks if t.idle] else None
  

  
    select.select([], wfds, [], timeout)
  You can see how the generator-based write() and busy_loop() coroutines are cooperatively yielding to one another just like greenthreads in eventlet would do. But, there’s a pretty fundamental flaw here – if we wanted to refactor the code above to re-use that write() method to e.g. call it multiple times with
  different input, we’d need to do something like:
def write_stuff():  
    for i in write(b'foo' * 10 * 1024 * 1024):
  
        yield
  
    for i in write(b'bar' * 10 * 1024 * 1024):
  
        yield

  but that’s pretty darn nasty! Well, that’s the whole>...  
def write(data):
  
    buf = memoryview(data)
  
    while len(buf):
  
        try:
  
            buf = buf[sock.send(buf):]
  
        except socket.error as e:
  
            if e.errno != errno.EAGAIN:
  
                raise e
  
            yield
  

  
def write_stuff():
  
    yield from write(b'foo\n' * 2 * 1024 * 1024)
  
    yield from write(b'bar\n' * 2 * 1024 * 1024)
  
    quit()
  

  
Task = collections.namedtuple('Task', ['generator', 'wfd', 'idle'])
  

  
tasks = [
  
    Task(busy_loop(), wfd=None, idle=True),
  
    Task(write_stuff(), wfd=sock, idle=False)
  
]
  
...
Conclusions?
  Yeah, this is the point where I’ve figured out what we should do in OpenStack. Or not.
  I really like the explicit nature of Tulip’s model – for each async task, you explicitly decide whether to block the current coroutine on its completion (or put another way, yield to another coroutine until the task has completed) or you register a callback to be notified of the tasks completion. I’d much prefer this to rather cavalier “don’t worry your little head” approach of hiding the async nature of what’s going on.
  However, the prospect of porting something like Nova to this model is more than a little dauting. If you think about the call stack of an REST API request being handled and ultimately doing an rpc.cast() and that the entire call stack would need to be ported to ‘yield from’ in order for us to yield and handle another API request while waiting for the result of rpc.cast() …. as I said, daunting.
  What I’m most interested in is how to design our new messaging APIto be able to support any and all of these models in future. I haven’t quite figured that out either, but it feels pretty doable.

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-550662-1-1.html 上篇帖子: [PYTHON]python 基础笔记(2) 下篇帖子: 【Python2】04、Python程序控制结构
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表