设为首页 收藏本站
查看: 845|回复: 0

[经验分享] Python边学边用--BT客户端实现之(三)Peer协议设计

[复制链接]

尚未签到

发表于 2015-4-27 10:16:11 | 显示全部楼层 |阅读模式
  与peer建立tcp连接后,首先发送handshake消息进行握手
  handshake消息格式如下:
  一个字节0x19 + 一个字符串'BitTorrent protocol' + 8 byte 保留字节默认值为0(draft中对保留字节有定义)
  + 种子文件中info 部分的sha1字,大小为20个字节 + 20个自己的peer id(从tracker获取到的peer信息大多没有peerid,这个可以使用本地的peer id)
  如果handshake信息协商不上,tcp连接将被关闭。
  
  BT标准BEP-3中定义了8种peer消息:消息格式为msg_len(4个字节) + msg_type(1一个字节) + payload
  0 - choke  --发送该消息表示本段发送阻塞,对端将不能获取到piece数据,payload 为 0
  1 - unchoke  --发送该消息表示解阻塞,对端可以开始发送请求获取piece数据,payload 为 0
  2 - interested  --发送该消息,表示对对端的pieces数据有兴趣,payload 为 0
  3 - not interested  ---发送该消息,表示对对端的pieces数据没有兴趣了,payload 为 0
  4 - have       ---发送该消息,通告对端 本段拥有的pieces,payload为4个字节的piece index
  5 - bitfield  ---发送该消息,通告对端 本段拥有的pieces,为bit map的方式表示每个piece index在本端是否拥有。piece index所在bit位为1,表示拥有。
  该消息为handshake成功后的第一个消息。
  6 - request   ---piece请求消息,payload为: index, begin, and length,都是4个字节表示,length一般实现为0x8000, 最大不能超过0x20000。
  7 - piece     ---piece  数据,payload为: index, begin,data
  8 - cancel    ---发送该消息,表示本端取消了某个piece请求。payload为:index, begin, and length
  
  使用python的异步socket接口实现,为了减少处理过程被socket阻塞,使用多个线程处理每个peer。
  每个peer包括3个线程:request timeout timer ,socket send data thread, socket receive data thread,使用select 函数判断socket是否可读、可写。
  对socket读写操作时使用RLock进行保护,select阻塞进程时不加锁,避免阻塞其他线程。
  发送数据数据时先写一个队列,然后通过set threading.Event 变量出发socket send data thread发送数据,保证发送数据的线程不阻塞
  由于 python没有结束线程的接口,socket send data thread, socket receive data thread 需要依赖特殊变量的赋值,使socket处理进程结束。
  使用同步调用来触发下载过程运转,尽量不使用timer轮询的方式,可以降低cpu使用率并加快下载过程。
  但是,多线程间的同步调用由于锁的存在,会导致性能下降并容易引入信号量死锁的问题。需要仔细设计好多线程的运行轨迹避免死锁。
  draft BEP中定义的功能暂未实现,peer的上传流控未实现,peer质量分级未实现。


DSC0000.gif DSC0001.gif PeerConnect


  1 '''
  2 Created on 2012-10-3
  3
  4 @author: ddt
  5 '''
  6 from socket import *
  7 import threading
  8 import log_info
  9 import select
10
11 class PeerConnect(object):
12     '''
13     TODO: upload flow control
14     TODO: peer quality management
15     '''
16     def __init__(self, ip, port, task_info):
17         '''
18         Constructor
19         '''
20         self.__ip = ip
21         self.__port = port
22         
23         self.__info_hash = task_info.get_info_hash()
24         self.__local_id = task_info.get_local_id()
25         self.__task_info = task_info
26         
27         leading_string = chr(19)+'BitTorrent protocol'
28         reserved_string = chr(0)*8
29         self.__handshake_info = leading_string + reserved_string
30         self.__handshake_info += self.__info_hash + self.__local_id
31     
32         self.__request_piece_len = 0x4000
33         self.__receive_data_len = 0x8000
34         
35         self.__tcp_con = None
36         self.__tcp_connect_timeout = 60
37         
38         self.__tcp_handshake_timeout = 60
39         
40         self.__keepalive_timer = None
41         self.__sck_send_thread = None
42         
43         self.__retry_timer = None
44         self.__retry_intvl = 2 # second
45         self.__retry_times = 0
46         self.__max_retrys = 10
47         
48         self.__local_choked = True
49         self.__peer_choked = True
50         self.__local_interested = False
51         self.__peer_interested = False
52         
53         self.__peer_have_pieces = []
54
55     
56         self.__local_requesting = False;
57         self.__local_requesting_pieces = []
58         self.__local_max_requesting = 10
59         self.__local_requesting_timer = None
60         self.__local_requesting_timeout_intvl = 30
61            
62         self.__receiving_cache = ''
63     
64         self.__peer_pending_request = []
65         self.__local_pending_request = []
66         
67         self.__local_pending_request_less = 10
68         self.__peer_have_pieces_pending = []
69         
70         self.__local_sending_queue = None
71         self.__rlock_sck_send = threading.RLock()
72         self.__local_sending_event = threading.Event()
73         self.__min_sck_send_msg = 0x1000
74         
75         self.__rlock_common = threading.RLock()
76         
77         self.__peer_started = False
78         
79         self.__peer_choked_timer = None
80         self.__peer_choked_timeout_intvl = 180
81         
82         self.__dispatch_timer = None
83         self.__dispatch_timeout = 5
84
85     def start(self):
86         with self.__rlock_common:
87             if not self.__peer_started:
88                 self.__retry_times = 0
89                 self.__startup_thread = threading.Thread(target=PeerConnect.__connect,args=(self,))
90                 self.__peer_started = True
91                 self.__startup_thread.start()
92     
93     def stop(self):
94         with self.__rlock_common:
95             if self.__peer_started:
96                 self.__retry_times = self.__max_retrys
97                 self.__disconnect()
98                 self.__peer_started = False
99         pass
100
101     def dispatch_pieces(self,pieces, piece_len):
102         
103         with self.__rlock_common:
104         
105             self.__write_log(str(pieces))
106             self.__write_log(str(self.__peer_have_pieces))
107            
108             if len(pieces) == 0:
109                 return False
110            
111             for piece_index in pieces:
112                 if piece_index not in self.__peer_have_pieces:
113                     return False
114                  
115             for piece_index in pieces:      
116                 for offset in range(0, piece_len, self.__request_piece_len):
117                     length = self.__request_piece_len
118                     if offset+length > piece_len:
119                         length = piece_len - offset
120                     piece = (piece_index, offset, length)
121                     if piece not in self.__local_pending_request:
122                         self.__local_pending_request.append(piece)
123            
124             if self.__dispatch_timer != None:
125                 self.__dispatch_timer.cancel()
126            
127             self.__check_local_request()
128            
129             if self.__peer_choked:
130                 if self.__peer_choked_timer == None or not self.__peer_choked_timer.is_alive():
131                     self.__peer_choked_timer = threading.Timer(self.__peer_choked_timeout_intvl, PeerConnect.__peer_choked_timeout, [self,])
132                     self.__peer_choked_timer.start()
133         
134            
135         return True
136     
137     def cancel_pieces(self, pieces):
138         with self.__rlock_common:
139             for piece in self.__local_pending_request:
140                 if piece[0] in pieces:
141                     self.__local_pending_request.remove(piece)
142                     
143             if self.__local_requesting:
144                 for piece in self.__local_requesting_pieces:
145                     if piece[0] in pieces:
146                         self.__send_cancel(piece)
147                         self.__local_requesting_pieces.remove(piece)
148                 if len(self.__local_requesting_pieces) == 0:
149                     self.__local_requesting = False
150                     if self.__local_requesting_timer != None:
151                         self.__local_requesting_timer.cancel()      
152                 self.__check_local_request()
153
154         
155     def set_choke_state(self, choked):
156         with self.__rlock_common:
157             if self.__local_choked != choked:
158                 if choked:
159                     self.__send_choked()
160                 else:
161                     self.__send_unchoked()
162                     self.__check_peer_request()
163             pass
164         
165     def get_peer_have_pieces(self):
166         with self.__rlock_common:
167             return self.__peer_have_pieces
168     
169     def notify_local_have_pieces(self, pieces):
170         with self.__rlock_common:
171             self.__send_have(pieces)
172         
173     def is_dead_peer(self):
174         with self.__rlock_common:
175             return  self.__retry_times > self.__max_retrys
176     
177     def get_local_pending_pieces(self):
178         with self.__rlock_common:
179             pieces_index = []
180             for piece in self.__local_pending_request:
181                 if piece[0] not in pieces_index:
182                     pieces_index.append(piece[0])
183             if self.__local_requesting:
184                 for piece in self.__local_requesting_pieces:
185                     if piece[0] not in pieces_index:
186                         pieces_index.append(piece[0])
187
188         return pieces_index
189     
190     def get_peer_addr(self):
191         return (self.__ip, self.__port)
192         
193     def __connect(self):
194
195         self.__tcp_con = socket(AF_INET, SOCK_STREAM)
196         self.__tcp_con.settimeout(self.__tcp_connect_timeout)
197
198         try:
199             self.__tcp_con.connect((self.__ip,self.__port))
200         except error , e:
201             self.__write_log('peer connect error: %s, retry' %e)
202             self.__retry_connect()
203             return
204         
205         self.__tcp_con.settimeout(None)
206         
207         self.__start_send_proc()
208
209         if not self.__handshake():
210             self.__retry_connect()
211             return
212         
213         self.__send_bitfield()
214         self.__send_unchoked()
215         self.__start_keepalive_timer()      
216
217         self.__recv_loop()
218     pass
219
220     def __disconnect(self):
221         self.__write_log('__disconnect:begin')
222         
223         if self.__retry_timer != None:
224             self.__retry_timer.cancel()
225         
226         if self.__keepalive_timer != None:
227             self.__keepalive_timer.cancel()
228            
229         if self.__local_sending_queue != None:
230             self.__local_sending_queue = None
231             self.__local_sending_event.set()
232            
233         if self.__peer_choked_timer != None:
234             self.__peer_choked_timer.cancel()
235            
236         if self.__dispatch_timer != None:
237             self.__dispatch_timer.cancel()   
238
239         
240         if self.__local_requesting:
241             self.__local_requesting = False
242             self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
243             self.__local_requesting_pieces = []
244             if self.__local_requesting_timer != None:
245                 self.__local_requesting_timer.cancel()
246         
247         self.__tcp_con.close()
248         self.__write_log('__disconnect: self.__tcp_con.closed')
249         
250         self.__receiving_cache = ''
251         
252         self.__local_choked = True
253         self.__peer_choked = True
254         self.__local_interested = False
255         self.__peer_interested = False
256         self.__local_requesting_pieces = []
257         self.__peer_pending_request = []
258         self.__peer_have_pieces = []
259         self.__peer_have_pieces_pending = []
260         pass
261     
262     def __start_keepalive_timer(self):
263         if self.__keepalive_timer != None:
264             self.__keepalive_timer.cancel()
265         self.__keepalive_timer = threading.Timer(120,PeerConnect.__send_keepalive_timeout,[self,])
266         self.__keepalive_timer.start()
267         
268     def __send_keepalive_timeout(self):
269         
270         with self.__rlock_common:
271             self.__send_keepalive()
272             self.__start_keepalive_timer()
273         
274     def __recv_loop(self):
275         self.__tcp_con.setblocking(False)
276         while True:
277             ready_r, ready_w, in_err = select.select([self.__tcp_con,], [], [self.__tcp_con,], 600)
278            
279             with self.__rlock_common:
280                 if self.__tcp_con in in_err:
281                     self.__write_log('__recv_loop: socket in error select result:%s' %str(in_err))
282                     self.__retry_connect()
283                     break
284                 
285                 if self.__tcp_con not in ready_r:
286                     self.__write_log('__recv_loop: unexpected select result!')
287                     continue
288                 
289                 try:
290                     received_data = self.__tcp_con.recv(self.__receive_data_len)
291                        
292                 except error, e:
293                     self.__write_log("receive data failed, error:%s, retry" %e)
294                     self.__retry_connect()
295                     break
296            
297                 if len(received_data) == 0:
298                     self.__write_log("have received null data")
299                     self.__retry_connect()
300                     break
301            
302                 self.__reveived_data(received_data)
303         pass
304     
305     def __start_send_proc(self):
306         with self.__rlock_sck_send:
307             self.__local_sending_queue = ''
308         self.__sck_send_thread = threading.Thread(target=PeerConnect.__proc_sending, args=(self,))
309         self.__sck_send_thread.start()
310         
311     def __proc_sending(self):
312         while self.__local_sending_queue != None:
313                        
314             ready_r, ready_w, in_err = select.select([], [self.__tcp_con,], [self.__tcp_con,])
315            
316             self.__local_sending_event.wait()
317
318             with self.__rlock_common:
319                 
320                 if self.__tcp_con  in in_err:
321                     self.__tcp_con.close()
322                     break
323                 
324                 if self.__tcp_con not in ready_w:
325                     self.__write_log('__proc_sending: unexpected select result!')
326                     continue
327                 
328                 if self.__local_sending_queue == None:
329                     break
330                 
331                 try:
332                     sent_len = self.__tcp_con.send(self.__local_sending_queue)
333                     self.__local_sending_queue = self.__local_sending_queue[sent_len:]
334                     
335                 except error,e:
336                     self.__tcp_con.close()
337                     self.__write_log('__proc_sending failed! error:%s' %str(e))
338                     break
339                     
340                 if len(self.__local_sending_queue) == 0:
341                     self.__local_sending_event.clear()
342         pass
343     
344     def __check_peer_request(self):
345         if self.__peer_interested and not self.__local_choked:
346             while len(self.__peer_pending_request) > 0:
347                 piece = self.__peer_pending_request.pop(0)
348                 piece_index, offset, length = piece
349                 if self.__local_have(piece_index):
350                     data = self.__read_piecedata(piece_index,offset, length)
351                     self.__send_piece(piece_index, offset, data)
352                 else:
353                     self.__write_log('peer request piece:%d not have.' %piece_index)
354         pass
355
356     def __check_local_request(self):
357         with self.__rlock_common:
358             self.__check_interested()
359            
360             if self.__local_requesting and len(self.__local_requesting_pieces) >= self.__local_max_requesting:
361                 return
362            
363             if len(self.__local_pending_request) != 0:
364                 if not self.__local_interested:
365                     self.__send_interested()
366             else:
367                 if len(self.__peer_have_pieces) != 0:
368                     if self.__dispatch_timer != None:
369                         self.__dispatch_timer.cancel()
370                        
371                     self.__dispatch_timer = threading.Timer(self.__dispatch_timeout,PeerConnect.__check_local_request ,[self,])
372                     self.__dispatch_timer.start()
373                     self.__local_interested = False
374                     self.__notify_pieces_completed()
375                 return
376            
377             if self.__peer_choked:
378                 return
379         
380             adding_piece = self.__local_pending_request.pop(0)
381             if adding_piece[0] not in self.__peer_have_pieces:
382                 for piece in self.__local_pending_request:
383                     if piece[0] == adding_piece[0]:
384                         self.__local_pending_request.remove(piece)
385                 self.__notify_pieces_canceled([adding_piece[0],])
386                 self.__check_local_request()
387             else:
388                 self.__local_requesting = True
389                 self.__local_requesting_pieces.append(adding_piece)   
390                 self.__send_request(adding_piece)
391                 self.__check_local_request()
392                 
393                 if self.__local_requesting_timer == None or not self.__local_requesting_timer.is_alive():
394                     self.__local_requesting_timer = threading.Timer(self.__local_requesting_timeout_intvl, PeerConnect.__local_requesting_timeout, [self,])
395                     self.__local_requesting_timer.start()
396             pass
397     
398     def __local_requesting_timeout(self):
399         with self.__rlock_common:
400             if self.__local_requesting:
401                 self.__local_requesting = False
402                 self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
403                 self.__local_requesting_pieces = []
404                 self.__local_interested = False
405                 self.__check_local_request()
406
407         pass
408     
409     def __peer_choked_timeout(self):
410         with self.__rlock_common:
411             if self.__peer_choked:
412                 pending_pieces = []
413                 for piece in self.__local_pending_request:
414                     if piece[0] not in pending_pieces:
415                         pending_pieces.append(piece[0])
416                 if len(pending_pieces) != 0:
417                     self.__notify_pieces_canceled(pending_pieces)
418                     self.__local_pending_request = []
419         
420                     
421     
422     def __check_interested(self):
423         if not self.__local_requesting:
424             if len(self.__local_pending_request) != 0 and not self.__local_interested:
425                 self.__send_interested()
426                 
427         if not self.__local_requesting and len(self.__local_pending_request) == 0:
428             if self.__local_interested:
429                 self.__send_notintrested()
430         pass
431                 
432     def __retry_connect(self):
433         
434         self.__write_log('__retry_connect')
435         pending_pieces = []
436         peer_dead = False
437         
438         with self.__rlock_common:
439             self.__disconnect()
440            
441             for piece in self.__local_pending_request:
442                 if piece[0] not in pending_pieces:
443                     pending_pieces.append(piece[0])
444                     
445             self.__retry_times += 1
446             if self.__retry_times > self.__max_retrys:
447                 peer_dead = True
448
449             else:
450                 self.__retry_timer = threading.Timer(self.__retry_intvl**self.__retry_times, PeerConnect.__connect, [self,])
451                 self.__retry_timer.start()
452                 
453         if peer_dead == True:        
454             self.__notify_peer_dead()
455            
456         if len(pending_pieces) != 0:
457             self.__notify_pieces_canceled(pending_pieces)
458             self.__local_pending_request = []
459
460     def __reveived_data(self, data):
461         self.__receiving_cache += data
462            
463         while len(self.__receiving_cache) >= 4:
464             msg_len = _str_ntohl(self.__receiving_cache[0:4])
465            
466             if (len(self.__receiving_cache)-4) >= msg_len:
467                 self.__proc_msg(self.__receiving_cache[4:(4+msg_len)])
468                 self.__receiving_cache = data[4+msg_len:]
469             else:
470                 break
471                     
472                 
473     def __proc_msg(self, msg):
474         with self.__rlock_common:
475             if len(msg) == 0:
476                 self.__received_keepalive()
477             else:
478                 msg_type = msg[0]
479                 if msg_type == chr(0):
480                     self.__received_choked()
481                 elif msg_type == chr(1):
482                     self.__received_unchoked()
483                 elif msg_type == chr(2):
484                     self.__received_interested()
485                 elif msg_type == chr(3):
486                     self.__received_notinterested()
487                 elif msg_type == chr(4):
488                     self.__received_have(msg[1:])                                
489                 elif msg_type == chr(5):
490                     self.__received_bitfield(msg[1:])
491                 elif msg_type == chr(6):
492                     self.__received_request(msg[1:])            
493                 elif msg_type == chr(7):
494                     self.__received_piece(msg[1:])
495                 elif msg_type == chr(8):
496                     self.__received_cancel(msg[1:])
497                 else:
498                     self.__write_log('received unknown msg :%s' %list(msg))
499     
500     def  __handshake(self):
501         self.__write_log('send handshake: %s' %list(self.__handshake_info))
502         self.__tcp_con.sendall(self.__handshake_info)
503         
504         try:
505             self.__tcp_con.settimeout(self.__tcp_handshake_timeout)
506             rsp = self.__tcp_con.recv(68)
507            
508             if len(rsp) != 68:
509                 return False
510            
511             self.__tcp_con.settimeout(None)
512             self.__write_log('recived handshake rsp: %s' %list(rsp))
513             self.__peer_id = rsp[47:67]
514             self.__write_log('peer_id:%s' %self.__peer_id)
515            
516         except (error,timeout), e:
517             self.__write_log('handshake failed, error:%s' %e)
518             return False
519         return True
520     
521     def __received_keepalive(self):
522         self.__write_log('received keepalive')
523         #self.__send_keepalive()
524         pass
525            
526     def __received_choked(self):
527
528         self.__peer_choked = True
529         
530         if self.__local_requesting:
531             self.__local_requesting = False
532             self.__local_pending_request =  self.__local_requesting_pieces + self.__local_pending_request
533             self.__local_requesting_pieces = []
534            
535         self.__notify_peer_choked()
536         
537         if len(self.__local_pending_request) != 0:
538             pending_pieces = []
539             for piece in self.__local_pending_request:
540                 if piece[0] not in pending_pieces:
541                     pending_pieces.append(piece[0])
542             self.__notify_pieces_canceled(pending_pieces)
543             self.__local_pending_request = []
544             self.__local_interested = False
545            
546         self.__write_log('received choked')
547         
548     def __received_unchoked(self):
549         self.__peer_choked = False
550         
551         self.__notify_peer_unchoked()
552         
553         #if len(self.__local_pending_request) < self.__local_pending_request_less:
554         self.__check_local_request()
555         
556         if self.__peer_choked_timer != None:
557             self.__peer_choked_timer.cancel()
558            
559         self.__write_log('received unchoked')
560         
561     
562     def __received_interested(self):
563         self.__peer_interested = True
564         #self.__send_unchoked()
565         self.__write_log('received interested')
566         
567     def __received_notinterested(self):
568         self.__peer_interested = False
569         self.__peer_pending_request = []
570         self.__write_log('received notinterested')
571         
572     def __received_have(self, data):
573         '''
574         TODO:Notify peer have pieces changed
575         '''
576         piece_index = _str_ntohl(data[0:4])
577         if piece_index not in self.__peer_have_pieces:
578             self.__peer_have_pieces.append(piece_index)
579             self.__peer_have_pieces_pending.append(piece_index)
580             if len(self.__local_pending_request) < self.__local_pending_request_less:
581                 self.__notify_pieces_have(self.__peer_have_pieces_pending)
582                 self.__peer_have_pieces_pending = []
583            
584            
585         self.__write_log('received have piece:%d' %piece_index)
586
587     def __received_bitfield(self, data):
588         '''
589         '''
590         bitfield_len = len(data)
591         for i in range(0,bitfield_len):
592             byte = data
593             for j in range(0,8):
594                 byte_mask = 116)%0x100)
799     msg += chr((integer>>8)%0x100)
800     msg += chr(integer%0x100)
801     return msg
802
803 def _str_ntohl(msg):
804     integer = 0
805     integer += ord(msg[0])

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-61129-1-1.html 上篇帖子: 关于在组件GIS开发中使用Python的一点补充说明 下篇帖子: [转]Python 常用包收集
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表