renheshi 发表于 2015-4-27 10:16:11

Python边学边用--BT客户端实现之(三)Peer协议设计

  与peer建立tcp连接后,首先发送handshake消息进行握手
  handshake消息格式如下:
  一个字节0x19 + 一个字符串'BitTorrent protocol' + 8 byte 保留字节默认值为0(draft中对保留字节有定义)
  + 种子文件中info 部分的sha1字,大小为20个字节 + 20个自己的peer id(从tracker获取到的peer信息大多没有peerid,这个可以使用本地的peer id)
  如果handshake信息协商不上,tcp连接将被关闭。
  
  BT标准BEP-3中定义了8种peer消息:消息格式为msg_len(4个字节) + msg_type(1一个字节) + payload
  0 - choke--发送该消息表示本段发送阻塞,对端将不能获取到piece数据,payload 为 0
  1 - unchoke--发送该消息表示解阻塞,对端可以开始发送请求获取piece数据,payload 为 0
  2 - interested--发送该消息,表示对对端的pieces数据有兴趣,payload 为 0
  3 - not interested---发送该消息,表示对对端的pieces数据没有兴趣了,payload 为 0
  4 - have       ---发送该消息,通告对端 本段拥有的pieces,payload为4个字节的piece index
  5 - bitfield---发送该消息,通告对端 本段拥有的pieces,为bit map的方式表示每个piece index在本端是否拥有。piece index所在bit位为1,表示拥有。
  该消息为handshake成功后的第一个消息。
  6 - request   ---piece请求消息,payload为: index, begin, and length,都是4个字节表示,length一般实现为0x8000, 最大不能超过0x20000。
  7 - piece   ---piece数据,payload为: index, begin,data
  8 - cancel    ---发送该消息,表示本端取消了某个piece请求。payload为:index, begin, and length
  
  使用python的异步socket接口实现,为了减少处理过程被socket阻塞,使用多个线程处理每个peer。
  每个peer包括3个线程:request timeout timer ,socket send data thread, socket receive data thread,使用select 函数判断socket是否可读、可写。
  对socket读写操作时使用RLock进行保护,select阻塞进程时不加锁,避免阻塞其他线程。
  发送数据数据时先写一个队列,然后通过set threading.Event 变量出发socket send data thread发送数据,保证发送数据的线程不阻塞
  由于 python没有结束线程的接口,socket send data thread, socket receive data thread 需要依赖特殊变量的赋值,使socket处理进程结束。
  使用同步调用来触发下载过程运转,尽量不使用timer轮询的方式,可以降低cpu使用率并加快下载过程。
  但是,多线程间的同步调用由于锁的存在,会导致性能下降并容易引入信号量死锁的问题。需要仔细设计好多线程的运行轨迹避免死锁。
  draft BEP中定义的功能暂未实现,peer的上传流控未实现,peer质量分级未实现。


PeerConnect


1 '''
2 Created on 2012-10-3
3
4 @author: ddt
5 '''
6 from socket import *
7 import threading
8 import log_info
9 import select
10
11 class PeerConnect(object):
12   '''
13   TODO: upload flow control
14   TODO: peer quality management
15   '''
16   def __init__(self, ip, port, task_info):
17         '''
18         Constructor
19         '''
20         self.__ip = ip
21         self.__port = port
22         
23         self.__info_hash = task_info.get_info_hash()
24         self.__local_id = task_info.get_local_id()
25         self.__task_info = task_info
26         
27         leading_string = chr(19)+'BitTorrent protocol'
28         reserved_string = chr(0)*8
29         self.__handshake_info = leading_string + reserved_string
30         self.__handshake_info += self.__info_hash + self.__local_id
31   
32         self.__request_piece_len = 0x4000
33         self.__receive_data_len = 0x8000
34         
35         self.__tcp_con = None
36         self.__tcp_connect_timeout = 60
37         
38         self.__tcp_handshake_timeout = 60
39         
40         self.__keepalive_timer = None
41         self.__sck_send_thread = None
42         
43         self.__retry_timer = None
44         self.__retry_intvl = 2 # second
45         self.__retry_times = 0
46         self.__max_retrys = 10
47         
48         self.__local_choked = True
49         self.__peer_choked = True
50         self.__local_interested = False
51         self.__peer_interested = False
52         
53         self.__peer_have_pieces = []
54
55   
56         self.__local_requesting = False;
57         self.__local_requesting_pieces = []
58         self.__local_max_requesting = 10
59         self.__local_requesting_timer = None
60         self.__local_requesting_timeout_intvl = 30
61            
62         self.__receiving_cache = ''
63   
64         self.__peer_pending_request = []
65         self.__local_pending_request = []
66         
67         self.__local_pending_request_less = 10
68         self.__peer_have_pieces_pending = []
69         
70         self.__local_sending_queue = None
71         self.__rlock_sck_send = threading.RLock()
72         self.__local_sending_event = threading.Event()
73         self.__min_sck_send_msg = 0x1000
74         
75         self.__rlock_common = threading.RLock()
76         
77         self.__peer_started = False
78         
79         self.__peer_choked_timer = None
80         self.__peer_choked_timeout_intvl = 180
81         
82         self.__dispatch_timer = None
83         self.__dispatch_timeout = 5
84
85   def start(self):
86         with self.__rlock_common:
87             if not self.__peer_started:
88               self.__retry_times = 0
89               self.__startup_thread = threading.Thread(target=PeerConnect.__connect,args=(self,))
90               self.__peer_started = True
91               self.__startup_thread.start()
92   
93   def stop(self):
94         with self.__rlock_common:
95             if self.__peer_started:
96               self.__retry_times = self.__max_retrys
97               self.__disconnect()
98               self.__peer_started = False
99         pass
100
101   def dispatch_pieces(self,pieces, piece_len):
102         
103         with self.__rlock_common:
104         
105             self.__write_log(str(pieces))
106             self.__write_log(str(self.__peer_have_pieces))
107            
108             if len(pieces) == 0:
109               return False
110            
111             for piece_index in pieces:
112               if piece_index not in self.__peer_have_pieces:
113                     return False
114                  
115             for piece_index in pieces:      
116               for offset in range(0, piece_len, self.__request_piece_len):
117                     length = self.__request_piece_len
118                     if offset+length > piece_len:
119                         length = piece_len - offset
120                     piece = (piece_index, offset, length)
121                     if piece not in self.__local_pending_request:
122                         self.__local_pending_request.append(piece)
123            
124             if self.__dispatch_timer != None:
125               self.__dispatch_timer.cancel()
126            
127             self.__check_local_request()
128            
129             if self.__peer_choked:
130               if self.__peer_choked_timer == None or not self.__peer_choked_timer.is_alive():
131                     self.__peer_choked_timer = threading.Timer(self.__peer_choked_timeout_intvl, PeerConnect.__peer_choked_timeout, )
132                     self.__peer_choked_timer.start()
133         
134            
135         return True
136   
137   def cancel_pieces(self, pieces):
138         with self.__rlock_common:
139             for piece in self.__local_pending_request:
140               if piece in pieces:
141                     self.__local_pending_request.remove(piece)
142                     
143             if self.__local_requesting:
144               for piece in self.__local_requesting_pieces:
145                     if piece in pieces:
146                         self.__send_cancel(piece)
147                         self.__local_requesting_pieces.remove(piece)
148               if len(self.__local_requesting_pieces) == 0:
149                     self.__local_requesting = False
150                     if self.__local_requesting_timer != None:
151                         self.__local_requesting_timer.cancel()      
152               self.__check_local_request()
153
154         
155   def set_choke_state(self, choked):
156         with self.__rlock_common:
157             if self.__local_choked != choked:
158               if choked:
159                     self.__send_choked()
160               else:
161                     self.__send_unchoked()
162                     self.__check_peer_request()
163             pass
164         
165   def get_peer_have_pieces(self):
166         with self.__rlock_common:
167             return self.__peer_have_pieces
168   
169   def notify_local_have_pieces(self, pieces):
170         with self.__rlock_common:
171             self.__send_have(pieces)
172         
173   def is_dead_peer(self):
174         with self.__rlock_common:
175             returnself.__retry_times > self.__max_retrys
176   
177   def get_local_pending_pieces(self):
178         with self.__rlock_common:
179             pieces_index = []
180             for piece in self.__local_pending_request:
181               if piece not in pieces_index:
182                     pieces_index.append(piece)
183             if self.__local_requesting:
184               for piece in self.__local_requesting_pieces:
185                     if piece not in pieces_index:
186                         pieces_index.append(piece)
187
188         return pieces_index
189   
190   def get_peer_addr(self):
191         return (self.__ip, self.__port)
192         
193   def __connect(self):
194
195         self.__tcp_con = socket(AF_INET, SOCK_STREAM)
196         self.__tcp_con.settimeout(self.__tcp_connect_timeout)
197
198         try:
199             self.__tcp_con.connect((self.__ip,self.__port))
200         except error , e:
201             self.__write_log('peer connect error: %s, retry' %e)
202             self.__retry_connect()
203             return
204         
205         self.__tcp_con.settimeout(None)
206         
207         self.__start_send_proc()
208
209         if not self.__handshake():
210             self.__retry_connect()
211             return
212         
213         self.__send_bitfield()
214         self.__send_unchoked()
215         self.__start_keepalive_timer()      
216
217         self.__recv_loop()
218   pass
219
220   def __disconnect(self):
221         self.__write_log('__disconnect:begin')
222         
223         if self.__retry_timer != None:
224             self.__retry_timer.cancel()
225         
226         if self.__keepalive_timer != None:
227             self.__keepalive_timer.cancel()
228            
229         if self.__local_sending_queue != None:
230             self.__local_sending_queue = None
231             self.__local_sending_event.set()
232            
233         if self.__peer_choked_timer != None:
234             self.__peer_choked_timer.cancel()
235            
236         if self.__dispatch_timer != None:
237             self.__dispatch_timer.cancel()   
238
239         
240         if self.__local_requesting:
241             self.__local_requesting = False
242             self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
243             self.__local_requesting_pieces = []
244             if self.__local_requesting_timer != None:
245               self.__local_requesting_timer.cancel()
246         
247         self.__tcp_con.close()
248         self.__write_log('__disconnect: self.__tcp_con.closed')
249         
250         self.__receiving_cache = ''
251         
252         self.__local_choked = True
253         self.__peer_choked = True
254         self.__local_interested = False
255         self.__peer_interested = False
256         self.__local_requesting_pieces = []
257         self.__peer_pending_request = []
258         self.__peer_have_pieces = []
259         self.__peer_have_pieces_pending = []
260         pass
261   
262   def __start_keepalive_timer(self):
263         if self.__keepalive_timer != None:
264             self.__keepalive_timer.cancel()
265         self.__keepalive_timer = threading.Timer(120,PeerConnect.__send_keepalive_timeout,)
266         self.__keepalive_timer.start()
267         
268   def __send_keepalive_timeout(self):
269         
270         with self.__rlock_common:
271             self.__send_keepalive()
272             self.__start_keepalive_timer()
273         
274   def __recv_loop(self):
275         self.__tcp_con.setblocking(False)
276         while True:
277             ready_r, ready_w, in_err = select.select(, [], , 600)
278            
279             with self.__rlock_common:
280               if self.__tcp_con in in_err:
281                     self.__write_log('__recv_loop: socket in error select result:%s' %str(in_err))
282                     self.__retry_connect()
283                     break
284               
285               if self.__tcp_con not in ready_r:
286                     self.__write_log('__recv_loop: unexpected select result!')
287                     continue
288               
289               try:
290                     received_data = self.__tcp_con.recv(self.__receive_data_len)
291                        
292               except error, e:
293                     self.__write_log("receive data failed, error:%s, retry" %e)
294                     self.__retry_connect()
295                     break
296            
297               if len(received_data) == 0:
298                     self.__write_log("have received null data")
299                     self.__retry_connect()
300                     break
301            
302               self.__reveived_data(received_data)
303         pass
304   
305   def __start_send_proc(self):
306         with self.__rlock_sck_send:
307             self.__local_sending_queue = ''
308         self.__sck_send_thread = threading.Thread(target=PeerConnect.__proc_sending, args=(self,))
309         self.__sck_send_thread.start()
310         
311   def __proc_sending(self):
312         while self.__local_sending_queue != None:
313                        
314             ready_r, ready_w, in_err = select.select([], , )
315            
316             self.__local_sending_event.wait()
317
318             with self.__rlock_common:
319               
320               if self.__tcp_conin in_err:
321                     self.__tcp_con.close()
322                     break
323               
324               if self.__tcp_con not in ready_w:
325                     self.__write_log('__proc_sending: unexpected select result!')
326                     continue
327               
328               if self.__local_sending_queue == None:
329                     break
330               
331               try:
332                     sent_len = self.__tcp_con.send(self.__local_sending_queue)
333                     self.__local_sending_queue = self.__local_sending_queue
334                     
335               except error,e:
336                     self.__tcp_con.close()
337                     self.__write_log('__proc_sending failed! error:%s' %str(e))
338                     break
339                     
340               if len(self.__local_sending_queue) == 0:
341                     self.__local_sending_event.clear()
342         pass
343   
344   def __check_peer_request(self):
345         if self.__peer_interested and not self.__local_choked:
346             while len(self.__peer_pending_request) > 0:
347               piece = self.__peer_pending_request.pop(0)
348               piece_index, offset, length = piece
349               if self.__local_have(piece_index):
350                     data = self.__read_piecedata(piece_index,offset, length)
351                     self.__send_piece(piece_index, offset, data)
352               else:
353                     self.__write_log('peer request piece:%d not have.' %piece_index)
354         pass
355
356   def __check_local_request(self):
357         with self.__rlock_common:
358             self.__check_interested()
359            
360             if self.__local_requesting and len(self.__local_requesting_pieces) >= self.__local_max_requesting:
361               return
362            
363             if len(self.__local_pending_request) != 0:
364               if not self.__local_interested:
365                     self.__send_interested()
366             else:
367               if len(self.__peer_have_pieces) != 0:
368                     if self.__dispatch_timer != None:
369                         self.__dispatch_timer.cancel()
370                        
371                     self.__dispatch_timer = threading.Timer(self.__dispatch_timeout,PeerConnect.__check_local_request ,)
372                     self.__dispatch_timer.start()
373                     self.__local_interested = False
374                     self.__notify_pieces_completed()
375               return
376            
377             if self.__peer_choked:
378               return
379         
380             adding_piece = self.__local_pending_request.pop(0)
381             if adding_piece not in self.__peer_have_pieces:
382               for piece in self.__local_pending_request:
383                     if piece == adding_piece:
384                         self.__local_pending_request.remove(piece)
385               self.__notify_pieces_canceled(,])
386               self.__check_local_request()
387             else:
388               self.__local_requesting = True
389               self.__local_requesting_pieces.append(adding_piece)   
390               self.__send_request(adding_piece)
391               self.__check_local_request()
392               
393               if self.__local_requesting_timer == None or not self.__local_requesting_timer.is_alive():
394                     self.__local_requesting_timer = threading.Timer(self.__local_requesting_timeout_intvl, PeerConnect.__local_requesting_timeout, )
395                     self.__local_requesting_timer.start()
396             pass
397   
398   def __local_requesting_timeout(self):
399         with self.__rlock_common:
400             if self.__local_requesting:
401               self.__local_requesting = False
402               self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
403               self.__local_requesting_pieces = []
404               self.__local_interested = False
405               self.__check_local_request()
406
407         pass
408   
409   def __peer_choked_timeout(self):
410         with self.__rlock_common:
411             if self.__peer_choked:
412               pending_pieces = []
413               for piece in self.__local_pending_request:
414                     if piece not in pending_pieces:
415                         pending_pieces.append(piece)
416               if len(pending_pieces) != 0:
417                     self.__notify_pieces_canceled(pending_pieces)
418                     self.__local_pending_request = []
419         
420                     
421   
422   def __check_interested(self):
423         if not self.__local_requesting:
424             if len(self.__local_pending_request) != 0 and not self.__local_interested:
425               self.__send_interested()
426               
427         if not self.__local_requesting and len(self.__local_pending_request) == 0:
428             if self.__local_interested:
429               self.__send_notintrested()
430         pass
431               
432   def __retry_connect(self):
433         
434         self.__write_log('__retry_connect')
435         pending_pieces = []
436         peer_dead = False
437         
438         with self.__rlock_common:
439             self.__disconnect()
440            
441             for piece in self.__local_pending_request:
442               if piece not in pending_pieces:
443                     pending_pieces.append(piece)
444                     
445             self.__retry_times += 1
446             if self.__retry_times > self.__max_retrys:
447               peer_dead = True
448
449             else:
450               self.__retry_timer = threading.Timer(self.__retry_intvl**self.__retry_times, PeerConnect.__connect, )
451               self.__retry_timer.start()
452               
453         if peer_dead == True:      
454             self.__notify_peer_dead()
455            
456         if len(pending_pieces) != 0:
457             self.__notify_pieces_canceled(pending_pieces)
458             self.__local_pending_request = []
459
460   def __reveived_data(self, data):
461         self.__receiving_cache += data
462            
463         while len(self.__receiving_cache) >= 4:
464             msg_len = _str_ntohl(self.__receiving_cache)
465            
466             if (len(self.__receiving_cache)-4) >= msg_len:
467               self.__proc_msg(self.__receiving_cache)
468               self.__receiving_cache = data
469             else:
470               break
471                     
472               
473   def __proc_msg(self, msg):
474         with self.__rlock_common:
475             if len(msg) == 0:
476               self.__received_keepalive()
477             else:
478               msg_type = msg
479               if msg_type == chr(0):
480                     self.__received_choked()
481               elif msg_type == chr(1):
482                     self.__received_unchoked()
483               elif msg_type == chr(2):
484                     self.__received_interested()
485               elif msg_type == chr(3):
486                     self.__received_notinterested()
487               elif msg_type == chr(4):
488                     self.__received_have(msg)                              
489               elif msg_type == chr(5):
490                     self.__received_bitfield(msg)
491               elif msg_type == chr(6):
492                     self.__received_request(msg)            
493               elif msg_type == chr(7):
494                     self.__received_piece(msg)
495               elif msg_type == chr(8):
496                     self.__received_cancel(msg)
497               else:
498                     self.__write_log('received unknown msg :%s' %list(msg))
499   
500   def__handshake(self):
501         self.__write_log('send handshake: %s' %list(self.__handshake_info))
502         self.__tcp_con.sendall(self.__handshake_info)
503         
504         try:
505             self.__tcp_con.settimeout(self.__tcp_handshake_timeout)
506             rsp = self.__tcp_con.recv(68)
507            
508             if len(rsp) != 68:
509               return False
510            
511             self.__tcp_con.settimeout(None)
512             self.__write_log('recived handshake rsp: %s' %list(rsp))
513             self.__peer_id = rsp
514             self.__write_log('peer_id:%s' %self.__peer_id)
515            
516         except (error,timeout), e:
517             self.__write_log('handshake failed, error:%s' %e)
518             return False
519         return True
520   
521   def __received_keepalive(self):
522         self.__write_log('received keepalive')
523         #self.__send_keepalive()
524         pass
525            
526   def __received_choked(self):
527
528         self.__peer_choked = True
529         
530         if self.__local_requesting:
531             self.__local_requesting = False
532             self.__local_pending_request =self.__local_requesting_pieces + self.__local_pending_request
533             self.__local_requesting_pieces = []
534            
535         self.__notify_peer_choked()
536         
537         if len(self.__local_pending_request) != 0:
538             pending_pieces = []
539             for piece in self.__local_pending_request:
540               if piece not in pending_pieces:
541                     pending_pieces.append(piece)
542             self.__notify_pieces_canceled(pending_pieces)
543             self.__local_pending_request = []
544             self.__local_interested = False
545            
546         self.__write_log('received choked')
547         
548   def __received_unchoked(self):
549         self.__peer_choked = False
550         
551         self.__notify_peer_unchoked()
552         
553         #if len(self.__local_pending_request) < self.__local_pending_request_less:
554         self.__check_local_request()
555         
556         if self.__peer_choked_timer != None:
557             self.__peer_choked_timer.cancel()
558            
559         self.__write_log('received unchoked')
560         
561   
562   def __received_interested(self):
563         self.__peer_interested = True
564         #self.__send_unchoked()
565         self.__write_log('received interested')
566         
567   def __received_notinterested(self):
568         self.__peer_interested = False
569         self.__peer_pending_request = []
570         self.__write_log('received notinterested')
571         
572   def __received_have(self, data):
573         '''
574         TODO:Notify peer have pieces changed
575         '''
576         piece_index = _str_ntohl(data)
577         if piece_index not in self.__peer_have_pieces:
578             self.__peer_have_pieces.append(piece_index)
579             self.__peer_have_pieces_pending.append(piece_index)
580             if len(self.__local_pending_request) < self.__local_pending_request_less:
581               self.__notify_pieces_have(self.__peer_have_pieces_pending)
582               self.__peer_have_pieces_pending = []
583            
584            
585         self.__write_log('received have piece:%d' %piece_index)
586
587   def __received_bitfield(self, data):
588         '''
589         '''
590         bitfield_len = len(data)
591         for i in range(0,bitfield_len):
592             byte = data
593             for j in range(0,8):
594               byte_mask = 116)%0x100)
799   msg += chr((integer>>8)%0x100)
800   msg += chr(integer%0x100)
801   return msg
802
803 def _str_ntohl(msg):
804   integer = 0
805   integer += ord(msg)
页: [1]
查看完整版本: Python边学边用--BT客户端实现之(三)Peer协议设计