1 # A source that mimics Unix "tail -f"
2 def unix_tail_f_co(thefile, target):
3 '''
4 target是一个coroutine
5 '''
6 thefile.seek(0, 2) # 跳到文件末尾
7 while True:
8 line = thefile.readline()
9 if not line:
10 time.sleep(0.1)
11 continue
12 # 把数据发送给coroutine进行处理
13 target.send(line)
在上面的代码中,可以看到,target是一个coroutine,函数每次读取一行数据,读取到之后,就调用target.send()函数,把数据发送给了target,由target接收进行下一步的处理。
现在来看看作为sink的printer_co()函数,这个sink很简单,就是简单地打印它收到的数据。
1 # A sink just prints the lines
2 @coroutine
3 def printer_co():
4 while True:
5 # 在这个地方挂起,等待接收数据
6 line = (yield)
7 print line,
其中coroutine函数装饰器使我们在上一篇介绍coroutine的文章中定义的。从代码中可以看到,作为sink,print_co()函数有一个死循环,从第6行可以看到,在这个死循环中,
函数会一直挂起,等到数据的到来,然后每次接收到数据后,打印输出,然后再次挂起等待数据的到来。
现在可以把上面两个函数结合起来实现tail -f命令:
1 f = open("access-log")
2 unix_tail_f_co(f,printer_co())
代码首先打开一个文件f,f作为数据源,把f和printer_co()传递给unix_tail_f_co(),这就实现了一个pipeline,只不过在这个pipeline中,数据是直接发送给作为sink的printer_co()函数的,中间没有经过其他的coroutine。
在sink和source之间,可以根据需要,添加任何的coroutine,比如数据变换(transformation)、过滤(filter)和路由(routing)等
现在,我们添加一个coroutine,grep_filter_co(pattern,target),其中,target是一个coroutine
1 @coroutine
2 def grep_filter_co(pattern,target):
3 while True:
4 # 挂起,等待接收数据
5 line = (yield)
6 if pattern in line:
7 # 接收到的数据如果符合要求,
8 # 则发送给下一个coroutine进行处理
9 target.send(line)
从代码中可以看到,grep_filter_co()有一个死循环,在循环中挂起等待接收数据,一旦接收到了数据,如果数据中存在pattern,则把接收到的数据发送给target,让target对数据进行下一步处理,然后再次等待接收数据并挂起。
同样的,现在把这三个函数组合起来,实现tail -f | grep命令,组成一个新的pipeline:
f = open("access-log")
unix_tail_f_co(f,grep_filter_co("python",printer_co()))
unix_tail_f_co()作为source,从f文件处每次读取一行数据,发送给grep_filter_co()这个coroutine,grep_filer_co()对接收到的数据进行过滤(filter):如果接收到的数据包含了"python"这个单词,就把数据发送给printer_co()进行处理,然后source再把下一行数据发送到pipeline中进行处理。