设为首页 收藏本站
查看: 901|回复: 0

[经验分享] Python高级编程之生成器(Generator)与coroutine(四):一个简单的多任务系统

[复制链接]
发表于 2015-12-2 14:46:21 | 显示全部楼层 |阅读模式
  啊,终于要把这一个系列写完整了,好高兴啊

  在前面的三篇文章中介绍了Python的Python的Generator和coroutine(协程)相关的编程技术,接下来这篇文章会用Python的coroutine技术实现一个简单的多任务的操作系统
  代码如下,可看注释



  1 #-*-coding:utf-8 -*-
  2 '''
  3 用Python和coroutine实现一个简单的多任务系统
  4 '''
  5 # ##Step 1:Define Tasks###################################
  6 import select
  7 class Task(object):
  8     taskid = 0
  9
10     def __init__(self,target):
11         Task.taskid += 1
12         self.tid = Task.taskid    # Task id
13         self.target = target      # Target coroutine
14         self.sendval = None       # Value to send
15
16     def run(self):
17         return self.target.send(self.sendval)
18 # ###############################################
19
20 # ##Step 2:The Scheduler#########################
21 import Queue
22 class Scheduler(object):
23     def __init__(self):
24         self.ready = Queue.Queue()
25         self.taskmap = {}
26
27         # 正在等待的Tasks,key是taskid
28         self.exit_waiting = {}
29
30         # 异步IO
31         # Holding areas for tasks blocking on I/O.These are
32         # dictionaries mapping file descriptions to tasks
33         # 键值为文件描述符
34         self.read_waiting = {}
35         self.write_waiting = {}
36
37
38     def iotask(self):
39         while True:
40             if self.ready.empty():
41                 # 如果ready为空,表示没有正在等待执行的队列
42                 # timeout 为None,表示不关心任何文件描述符的变化
43                 self.iopool(None)
44             else:
45                 # ready不为空,则设置select函数不管文件描述符是否发生变化都立即返回
46                 self.iopool(0)
47             yield
48
49
50     def new(self,target):
51         newtask = Task(target)
52         self.taskmap[newtask.tid] = newtask
53         self.schedule(newtask)
54         return newtask.tid
55
56     def schedule(self,task):
57         # 把task放到任务队列中去
58         self.ready.put(task)
59
60     def exit(self,task):
61         print "Task %d terminated" %task.tid
62         del self.taskmap[task.tid]
63         # Notify other tasks waiting for exit
64         # 把正在等待的任务加入到正在执行的队列中去
65         for task in self.exit_waiting.pop(task.tid,[]):
66             self.schedule(task)
67
68     def waitforexit(self,task,waittid):
69         '''
70         让一个任务等待另外一个任务,把这个任务加入到exit_waiting中去
71         返回True表示这个task正在等待队列中
72         '''
73         if waittid in self.taskmap:
74             self.exit_waiting.setdefault(waittid,[]).append(task)
75             return True
76         else:
77             return False
78
79
80     def waitforread(self,task,fd):
81         '''
82         functions that simply put a task into to
83         one of the above dictionaries
84         '''
85         self.read_waiting[fd] = task
86
87     def waitforwrite(self,task,fd):
88         self.write_waiting[fd] = task
89
90     def iopool(self,timeout):
91         '''
92         I/O Polling.Use select() to determine which file
93         descriptors can be used.Unblock any associated task
94         '''
95         if self.read_waiting or self.write_waiting:
96             # 获取I/O事件,一旦获取到,就放入到执行队列中取,等待执行
97             r,w,e = select.select(self.read_waiting,
98                                   self.write_waiting,[],timeout)
99             for fd in r:
100                 self.schedule(self.read_waiting.pop(fd))
101
102             for fd in w:
103                 self.schedule(self.write_waiting.pop(fd))
104
105     def mainloop(self):
106         self.new(self.iotask())  # Launch I/O polls
107         while self.taskmap:
108             task = self.ready.get()
109             try:
110                 result = task.run()
111                 # 如果task执行的是System call,则对当前环境进行保存
112                 # 然后在执行System Call
113                 if isinstance(result,SystemCall):
114                     # 把当前的环境保存,即保存当前运行的task和sched
115                     result.task = task
116                     result.sched = self
117                     result.handle()
118                     continue
119             except StopIteration:
120                 self.exit(task)
121                 # print("task is over")
122                 continue
123             self.schedule(task)
124 # ##Step 2:The Scheduler#########################
125
126
127 # ##SystemCall#########################
128 class SystemCall(object):
129     '''
130     所有系统调用的基类,继承自该类的类要重写handle函数
131     '''
132     def handle(self):
133         pass
134
135
136 class GetTid(SystemCall):
137     '''
138     获取任务ID
139     '''
140     def handle(self):
141         self.task.sendval = self.task.tid
142         self.sched.schedule(self.task)
143
144
145 class NewTask(SystemCall):
146     '''
147     新建一个Task
148     '''
149     def __init__(self,target):
150         self.target = target
151
152     def handle(self):
153         # 在这里把target封装成Task
154         # 是在这里把新生成的task加入到执行队列当中去
155         tid = self.sched.new(self.target)
156         self.task.sendval = tid
157         # 把执行这个系统调用的父task重新加入到执行队列中去
158         # 这一点很关键,因为判断一个task是否结束是通过taskmap的
159         # 这个task只是暂时被挂起,要重新放到queue中去
160         self.sched.schedule(self.task)
161
162 class KillTask(SystemCall):
163     '''
164     杀死一个Task
165     '''
166     def __init__(self,tid):
167         self.tid = tid
168
169     def handle(self):
170         task = self.sched.taskmap.get(self.tid,None)
171         # task指的是要被kill掉的那个task
172         # self.task指的是发起KillTask这个系统调用task
173         if task:
174             task.target.close()
175             self.task.sendval = None
176         else:
177             self.task.sendval = False
178         # target.close()只是产生一个StopIteration异常
179         self.sched.schedule(self.task)
180
181
182 class WaitTask(SystemCall):
183     '''
184     让任务进行等待 系统调用
185     '''
186     def __init__(self,tid):
187         self.tid = tid
188
189     def handle(self):
190         result = self.sched.waitforexit(self.task,self.tid)
191         self.task.sendval = result
192         # 如果等待的是一个不存在的task,则立即返回
193         if not  result:
194             self.sched.schedule(self.task)
195
196
197
198
199 class ReadWait(SystemCall):
200     '''
201     异步读 系统调用
202     '''
203     def __init__(self,f):
204         self.f = f
205
206     def handle(self):
207         fd = self.f.fileno()
208         self.sched.waitforread(self.task,fd)
209
210 class WriteWait(SystemCall):
211     '''
212     异步写 系统调用
213     '''
214     def _init__(self,f):
215         self.f = f
216
217     def handle(self):
218         fd = self.f.fileno()
219         self.sched.waitforwrite(self.task,fd)
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-146426-1-1.html 上篇帖子: python使用smtplib和email发送腾讯企业邮箱邮件 下篇帖子: Linux下Python科学计算包numpy和SciPy的安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表