我们在上一章中看到使用threading.Thread.join()可以避免主程序在等待其它线程结束的过程中得到时间片。事实上,在线程编程中经常会遇到一个线程需要等待另一个线程中的情况。在这种情况下,需要重申的是,我们不希望等待的线程获得时间片。
解决的办法是使用条件变量,就像它的名称,代码使用它们来等待一些条件的成立。大多数线程系统都支持条件变量,python的threading包也不例外。
例如,pthreads包,使用pthread_cond来代表这种变量,并且有函数pthread_cond_wait(),它代表一个线程调用等待一个事件的发生;另一个函数,pthread_cond_signal(),另一个线程调用,代表声明等待的线程事件已经发生。
python中的条件变量较之C语言中的来说更容易使用,对于第一层次的条件变量来说,可以使用类threading.Condition,它能够在大多数的线程系统中良好的工作,但是使用这种条件变量需要进行额外的解锁和闭锁操作。
python提供了一种更高层次的类,threading.Event,它是对于threading.Condition的一种封装,而且在后台自动处理有关的锁操作。
接下去的例子将使用多线程来实现一个端口扫描器,多端口并行扫描,每个端口对应一个线程,同时程序可以控制线程的个数。如果主线程准备对一个新的端口进行扫描操作但是已经达到线程数的上限时,主线程会等待线程数减少到线程数上限以下再进行新端口的扫描,这个步骤通过条件变量来实现。
1 # portscanner.py: checks for active ports on a given machine; would be
2 # more realistic if checked several hosts at once; different threads
3 # check different ports; there is a self-imposed limit on the number of
4 # threads, and the event mechanism is used to wait if that limit is
5 # reached
6
7 # usage: python portscanner.py host maxthreads
8
9 import sys, threading, socket
10
11 class scanner(threading.Thread):
12 tlist = [] # list of all current scanner threads
13 maxthreads = int(sys.argv[2]) # max number of threads we’re allowing
14 evnt = threading.Event() # event to signal OK to create more threads
15 lck = threading.Lock() # lock to guard tlist
16 def __init__(self,tn,host):
17 threading.Thread.__init__(self)
18 self.threadnum = tn # thread ID/port number
19 self.host = host # checking ports on this host
20 def run(self):
21 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
22 try:
23 s.connect((self.host, self.threadnum))
24 print "%d: successfully connected" % self.threadnum
25 s.close()
26 except:
27 print "%d: connection failed" % self.threadnum
28 # thread is about to exit; remove from list, and signal OK if we
29 # had been up against the limit
30 scanner.lck.acquire()
31 scanner.tlist.remove(self)
32 print "%d: now active --" % self.threadnum, scanner.tlist
33 if len(scanner.tlist) == scanner.maxthreads-1:
34 scanner.evnt.set()
35 scanner.evnt.clear()
36 scanner.lck.release()
37 def newthread(pn,hst):
38 scanner.lck.acquire()
39 sc = scanner(pn,hst)
40 scanner.tlist.append(sc)
41 scanner.lck.release()
42 sc.start()
43 print "%d: starting check" % pn
44 print "%d: now active --" % pn, scanner.tlist
45 newthread = staticmethod(newthread)
46
47 def main():
13
48 host = sys.argv[1]
49 for i in range(1,100):
50 scanner.lck.acquire()
51 print "%d: attempting check" % i
52 # check to see if we’re at the limit before starting a new thread
53 if len(scanner.tlist) >= scanner.maxthreads:
54 # too bad, need to wait until not at thread limit
55 print "%d: need to wait" % i
56 scanner.lck.release()
57 scanner.evnt.wait()
58 else:
59 scanner.lck.release()
60 scanner.newthread(i,host)
61 for sc in scanner.tlist:
62 sc.join()
63
64 if __name__ == ’__main__’:
65 main()