Python高级编程之生成器(Generator)与coroutine(四):一个简单的多任务系统
啊,终于要把这一个系列写完整了,好高兴啊在前面的三篇文章中介绍了Python的Python的Generator和coroutine(协程)相关的编程技术,接下来这篇文章会用Python的coroutine技术实现一个简单的多任务的操作系统
代码如下,可看注释
1 #-*-coding:utf-8 -*-
2 '''
3 用Python和coroutine实现一个简单的多任务系统
4 '''
5 # ##Step 1:Define Tasks###################################
6 import select
7 class Task(object):
8 taskid = 0
9
10 def __init__(self,target):
11 Task.taskid += 1
12 self.tid = Task.taskid # Task id
13 self.target = target # Target coroutine
14 self.sendval = None # Value to send
15
16 def run(self):
17 return self.target.send(self.sendval)
18 # ###############################################
19
20 # ##Step 2:The Scheduler#########################
21 import Queue
22 class Scheduler(object):
23 def __init__(self):
24 self.ready = Queue.Queue()
25 self.taskmap = {}
26
27 # 正在等待的Tasks,key是taskid
28 self.exit_waiting = {}
29
30 # 异步IO
31 # Holding areas for tasks blocking on I/O.These are
32 # dictionaries mapping file descriptions to tasks
33 # 键值为文件描述符
34 self.read_waiting = {}
35 self.write_waiting = {}
36
37
38 def iotask(self):
39 while True:
40 if self.ready.empty():
41 # 如果ready为空,表示没有正在等待执行的队列
42 # timeout 为None,表示不关心任何文件描述符的变化
43 self.iopool(None)
44 else:
45 # ready不为空,则设置select函数不管文件描述符是否发生变化都立即返回
46 self.iopool(0)
47 yield
48
49
50 def new(self,target):
51 newtask = Task(target)
52 self.taskmap = newtask
53 self.schedule(newtask)
54 return newtask.tid
55
56 def schedule(self,task):
57 # 把task放到任务队列中去
58 self.ready.put(task)
59
60 def exit(self,task):
61 print "Task %d terminated" %task.tid
62 del self.taskmap
63 # Notify other tasks waiting for exit
64 # 把正在等待的任务加入到正在执行的队列中去
65 for task in self.exit_waiting.pop(task.tid,[]):
66 self.schedule(task)
67
68 def waitforexit(self,task,waittid):
69 '''
70 让一个任务等待另外一个任务,把这个任务加入到exit_waiting中去
71 返回True表示这个task正在等待队列中
72 '''
73 if waittid in self.taskmap:
74 self.exit_waiting.setdefault(waittid,[]).append(task)
75 return True
76 else:
77 return False
78
79
80 def waitforread(self,task,fd):
81 '''
82 functions that simply put a task into to
83 one of the above dictionaries
84 '''
85 self.read_waiting = task
86
87 def waitforwrite(self,task,fd):
88 self.write_waiting = task
89
90 def iopool(self,timeout):
91 '''
92 I/O Polling.Use select() to determine which file
93 descriptors can be used.Unblock any associated task
94 '''
95 if self.read_waiting or self.write_waiting:
96 # 获取I/O事件,一旦获取到,就放入到执行队列中取,等待执行
97 r,w,e = select.select(self.read_waiting,
98 self.write_waiting,[],timeout)
99 for fd in r:
100 self.schedule(self.read_waiting.pop(fd))
101
102 for fd in w:
103 self.schedule(self.write_waiting.pop(fd))
104
105 def mainloop(self):
106 self.new(self.iotask())# Launch I/O polls
107 while self.taskmap:
108 task = self.ready.get()
109 try:
110 result = task.run()
111 # 如果task执行的是System call,则对当前环境进行保存
112 # 然后在执行System Call
113 if isinstance(result,SystemCall):
114 # 把当前的环境保存,即保存当前运行的task和sched
115 result.task = task
116 result.sched = self
117 result.handle()
118 continue
119 except StopIteration:
120 self.exit(task)
121 # print("task is over")
122 continue
123 self.schedule(task)
124 # ##Step 2:The Scheduler#########################
125
126
127 # ##SystemCall#########################
128 class SystemCall(object):
129 '''
130 所有系统调用的基类,继承自该类的类要重写handle函数
131 '''
132 def handle(self):
133 pass
134
135
136 class GetTid(SystemCall):
137 '''
138 获取任务ID
139 '''
140 def handle(self):
141 self.task.sendval = self.task.tid
142 self.sched.schedule(self.task)
143
144
145 class NewTask(SystemCall):
146 '''
147 新建一个Task
148 '''
149 def __init__(self,target):
150 self.target = target
151
152 def handle(self):
153 # 在这里把target封装成Task
154 # 是在这里把新生成的task加入到执行队列当中去
155 tid = self.sched.new(self.target)
156 self.task.sendval = tid
157 # 把执行这个系统调用的父task重新加入到执行队列中去
158 # 这一点很关键,因为判断一个task是否结束是通过taskmap的
159 # 这个task只是暂时被挂起,要重新放到queue中去
160 self.sched.schedule(self.task)
161
162 class KillTask(SystemCall):
163 '''
164 杀死一个Task
165 '''
166 def __init__(self,tid):
167 self.tid = tid
168
169 def handle(self):
170 task = self.sched.taskmap.get(self.tid,None)
171 # task指的是要被kill掉的那个task
172 # self.task指的是发起KillTask这个系统调用task
173 if task:
174 task.target.close()
175 self.task.sendval = None
176 else:
177 self.task.sendval = False
178 # target.close()只是产生一个StopIteration异常
179 self.sched.schedule(self.task)
180
181
182 class WaitTask(SystemCall):
183 '''
184 让任务进行等待 系统调用
185 '''
186 def __init__(self,tid):
187 self.tid = tid
188
189 def handle(self):
190 result = self.sched.waitforexit(self.task,self.tid)
191 self.task.sendval = result
192 # 如果等待的是一个不存在的task,则立即返回
193 if notresult:
194 self.sched.schedule(self.task)
195
196
197
198
199 class ReadWait(SystemCall):
200 '''
201 异步读 系统调用
202 '''
203 def __init__(self,f):
204 self.f = f
205
206 def handle(self):
207 fd = self.f.fileno()
208 self.sched.waitforread(self.task,fd)
209
210 class WriteWait(SystemCall):
211 '''
212 异步写 系统调用
213 '''
214 def _init__(self,f):
215 self.f = f
216
217 def handle(self):
218 fd = self.f.fileno()
219 self.sched.waitforwrite(self.task,fd)
页:
[1]