|
与peer建立tcp连接后,首先发送handshake消息进行握手
handshake消息格式如下:
一个字节0x19 + 一个字符串'BitTorrent protocol' + 8 byte 保留字节默认值为0(draft中对保留字节有定义)
+ 种子文件中info 部分的sha1字,大小为20个字节 + 20个自己的peer id(从tracker获取到的peer信息大多没有peerid,这个可以使用本地的peer id)
如果handshake信息协商不上,tcp连接将被关闭。
BT标准BEP-3中定义了8种peer消息:消息格式为msg_len(4个字节) + msg_type(1一个字节) + payload
0 - choke --发送该消息表示本段发送阻塞,对端将不能获取到piece数据,payload 为 0
1 - unchoke --发送该消息表示解阻塞,对端可以开始发送请求获取piece数据,payload 为 0
2 - interested --发送该消息,表示对对端的pieces数据有兴趣,payload 为 0
3 - not interested ---发送该消息,表示对对端的pieces数据没有兴趣了,payload 为 0
4 - have ---发送该消息,通告对端 本段拥有的pieces,payload为4个字节的piece index
5 - bitfield ---发送该消息,通告对端 本段拥有的pieces,为bit map的方式表示每个piece index在本端是否拥有。piece index所在bit位为1,表示拥有。
该消息为handshake成功后的第一个消息。
6 - request ---piece请求消息,payload为: index, begin, and length,都是4个字节表示,length一般实现为0x8000, 最大不能超过0x20000。
7 - piece ---piece 数据,payload为: index, begin,data
8 - cancel ---发送该消息,表示本端取消了某个piece请求。payload为:index, begin, and length
使用python的异步socket接口实现,为了减少处理过程被socket阻塞,使用多个线程处理每个peer。
每个peer包括3个线程:request timeout timer ,socket send data thread, socket receive data thread,使用select 函数判断socket是否可读、可写。
对socket读写操作时使用RLock进行保护,select阻塞进程时不加锁,避免阻塞其他线程。
发送数据数据时先写一个队列,然后通过set threading.Event 变量出发socket send data thread发送数据,保证发送数据的线程不阻塞
由于 python没有结束线程的接口,socket send data thread, socket receive data thread 需要依赖特殊变量的赋值,使socket处理进程结束。
使用同步调用来触发下载过程运转,尽量不使用timer轮询的方式,可以降低cpu使用率并加快下载过程。
但是,多线程间的同步调用由于锁的存在,会导致性能下降并容易引入信号量死锁的问题。需要仔细设计好多线程的运行轨迹避免死锁。
draft BEP中定义的功能暂未实现,peer的上传流控未实现,peer质量分级未实现。
PeerConnect
1 '''
2 Created on 2012-10-3
3
4 @author: ddt
5 '''
6 from socket import *
7 import threading
8 import log_info
9 import select
10
11 class PeerConnect(object):
12 '''
13 TODO: upload flow control
14 TODO: peer quality management
15 '''
16 def __init__(self, ip, port, task_info):
17 '''
18 Constructor
19 '''
20 self.__ip = ip
21 self.__port = port
22
23 self.__info_hash = task_info.get_info_hash()
24 self.__local_id = task_info.get_local_id()
25 self.__task_info = task_info
26
27 leading_string = chr(19)+'BitTorrent protocol'
28 reserved_string = chr(0)*8
29 self.__handshake_info = leading_string + reserved_string
30 self.__handshake_info += self.__info_hash + self.__local_id
31
32 self.__request_piece_len = 0x4000
33 self.__receive_data_len = 0x8000
34
35 self.__tcp_con = None
36 self.__tcp_connect_timeout = 60
37
38 self.__tcp_handshake_timeout = 60
39
40 self.__keepalive_timer = None
41 self.__sck_send_thread = None
42
43 self.__retry_timer = None
44 self.__retry_intvl = 2 # second
45 self.__retry_times = 0
46 self.__max_retrys = 10
47
48 self.__local_choked = True
49 self.__peer_choked = True
50 self.__local_interested = False
51 self.__peer_interested = False
52
53 self.__peer_have_pieces = []
54
55
56 self.__local_requesting = False;
57 self.__local_requesting_pieces = []
58 self.__local_max_requesting = 10
59 self.__local_requesting_timer = None
60 self.__local_requesting_timeout_intvl = 30
61
62 self.__receiving_cache = ''
63
64 self.__peer_pending_request = []
65 self.__local_pending_request = []
66
67 self.__local_pending_request_less = 10
68 self.__peer_have_pieces_pending = []
69
70 self.__local_sending_queue = None
71 self.__rlock_sck_send = threading.RLock()
72 self.__local_sending_event = threading.Event()
73 self.__min_sck_send_msg = 0x1000
74
75 self.__rlock_common = threading.RLock()
76
77 self.__peer_started = False
78
79 self.__peer_choked_timer = None
80 self.__peer_choked_timeout_intvl = 180
81
82 self.__dispatch_timer = None
83 self.__dispatch_timeout = 5
84
85 def start(self):
86 with self.__rlock_common:
87 if not self.__peer_started:
88 self.__retry_times = 0
89 self.__startup_thread = threading.Thread(target=PeerConnect.__connect,args=(self,))
90 self.__peer_started = True
91 self.__startup_thread.start()
92
93 def stop(self):
94 with self.__rlock_common:
95 if self.__peer_started:
96 self.__retry_times = self.__max_retrys
97 self.__disconnect()
98 self.__peer_started = False
99 pass
100
101 def dispatch_pieces(self,pieces, piece_len):
102
103 with self.__rlock_common:
104
105 self.__write_log(str(pieces))
106 self.__write_log(str(self.__peer_have_pieces))
107
108 if len(pieces) == 0:
109 return False
110
111 for piece_index in pieces:
112 if piece_index not in self.__peer_have_pieces:
113 return False
114
115 for piece_index in pieces:
116 for offset in range(0, piece_len, self.__request_piece_len):
117 length = self.__request_piece_len
118 if offset+length > piece_len:
119 length = piece_len - offset
120 piece = (piece_index, offset, length)
121 if piece not in self.__local_pending_request:
122 self.__local_pending_request.append(piece)
123
124 if self.__dispatch_timer != None:
125 self.__dispatch_timer.cancel()
126
127 self.__check_local_request()
128
129 if self.__peer_choked:
130 if self.__peer_choked_timer == None or not self.__peer_choked_timer.is_alive():
131 self.__peer_choked_timer = threading.Timer(self.__peer_choked_timeout_intvl, PeerConnect.__peer_choked_timeout, [self,])
132 self.__peer_choked_timer.start()
133
134
135 return True
136
137 def cancel_pieces(self, pieces):
138 with self.__rlock_common:
139 for piece in self.__local_pending_request:
140 if piece[0] in pieces:
141 self.__local_pending_request.remove(piece)
142
143 if self.__local_requesting:
144 for piece in self.__local_requesting_pieces:
145 if piece[0] in pieces:
146 self.__send_cancel(piece)
147 self.__local_requesting_pieces.remove(piece)
148 if len(self.__local_requesting_pieces) == 0:
149 self.__local_requesting = False
150 if self.__local_requesting_timer != None:
151 self.__local_requesting_timer.cancel()
152 self.__check_local_request()
153
154
155 def set_choke_state(self, choked):
156 with self.__rlock_common:
157 if self.__local_choked != choked:
158 if choked:
159 self.__send_choked()
160 else:
161 self.__send_unchoked()
162 self.__check_peer_request()
163 pass
164
165 def get_peer_have_pieces(self):
166 with self.__rlock_common:
167 return self.__peer_have_pieces
168
169 def notify_local_have_pieces(self, pieces):
170 with self.__rlock_common:
171 self.__send_have(pieces)
172
173 def is_dead_peer(self):
174 with self.__rlock_common:
175 return self.__retry_times > self.__max_retrys
176
177 def get_local_pending_pieces(self):
178 with self.__rlock_common:
179 pieces_index = []
180 for piece in self.__local_pending_request:
181 if piece[0] not in pieces_index:
182 pieces_index.append(piece[0])
183 if self.__local_requesting:
184 for piece in self.__local_requesting_pieces:
185 if piece[0] not in pieces_index:
186 pieces_index.append(piece[0])
187
188 return pieces_index
189
190 def get_peer_addr(self):
191 return (self.__ip, self.__port)
192
193 def __connect(self):
194
195 self.__tcp_con = socket(AF_INET, SOCK_STREAM)
196 self.__tcp_con.settimeout(self.__tcp_connect_timeout)
197
198 try:
199 self.__tcp_con.connect((self.__ip,self.__port))
200 except error , e:
201 self.__write_log('peer connect error: %s, retry' %e)
202 self.__retry_connect()
203 return
204
205 self.__tcp_con.settimeout(None)
206
207 self.__start_send_proc()
208
209 if not self.__handshake():
210 self.__retry_connect()
211 return
212
213 self.__send_bitfield()
214 self.__send_unchoked()
215 self.__start_keepalive_timer()
216
217 self.__recv_loop()
218 pass
219
220 def __disconnect(self):
221 self.__write_log('__disconnect:begin')
222
223 if self.__retry_timer != None:
224 self.__retry_timer.cancel()
225
226 if self.__keepalive_timer != None:
227 self.__keepalive_timer.cancel()
228
229 if self.__local_sending_queue != None:
230 self.__local_sending_queue = None
231 self.__local_sending_event.set()
232
233 if self.__peer_choked_timer != None:
234 self.__peer_choked_timer.cancel()
235
236 if self.__dispatch_timer != None:
237 self.__dispatch_timer.cancel()
238
239
240 if self.__local_requesting:
241 self.__local_requesting = False
242 self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
243 self.__local_requesting_pieces = []
244 if self.__local_requesting_timer != None:
245 self.__local_requesting_timer.cancel()
246
247 self.__tcp_con.close()
248 self.__write_log('__disconnect: self.__tcp_con.closed')
249
250 self.__receiving_cache = ''
251
252 self.__local_choked = True
253 self.__peer_choked = True
254 self.__local_interested = False
255 self.__peer_interested = False
256 self.__local_requesting_pieces = []
257 self.__peer_pending_request = []
258 self.__peer_have_pieces = []
259 self.__peer_have_pieces_pending = []
260 pass
261
262 def __start_keepalive_timer(self):
263 if self.__keepalive_timer != None:
264 self.__keepalive_timer.cancel()
265 self.__keepalive_timer = threading.Timer(120,PeerConnect.__send_keepalive_timeout,[self,])
266 self.__keepalive_timer.start()
267
268 def __send_keepalive_timeout(self):
269
270 with self.__rlock_common:
271 self.__send_keepalive()
272 self.__start_keepalive_timer()
273
274 def __recv_loop(self):
275 self.__tcp_con.setblocking(False)
276 while True:
277 ready_r, ready_w, in_err = select.select([self.__tcp_con,], [], [self.__tcp_con,], 600)
278
279 with self.__rlock_common:
280 if self.__tcp_con in in_err:
281 self.__write_log('__recv_loop: socket in error select result:%s' %str(in_err))
282 self.__retry_connect()
283 break
284
285 if self.__tcp_con not in ready_r:
286 self.__write_log('__recv_loop: unexpected select result!')
287 continue
288
289 try:
290 received_data = self.__tcp_con.recv(self.__receive_data_len)
291
292 except error, e:
293 self.__write_log("receive data failed, error:%s, retry" %e)
294 self.__retry_connect()
295 break
296
297 if len(received_data) == 0:
298 self.__write_log("have received null data")
299 self.__retry_connect()
300 break
301
302 self.__reveived_data(received_data)
303 pass
304
305 def __start_send_proc(self):
306 with self.__rlock_sck_send:
307 self.__local_sending_queue = ''
308 self.__sck_send_thread = threading.Thread(target=PeerConnect.__proc_sending, args=(self,))
309 self.__sck_send_thread.start()
310
311 def __proc_sending(self):
312 while self.__local_sending_queue != None:
313
314 ready_r, ready_w, in_err = select.select([], [self.__tcp_con,], [self.__tcp_con,])
315
316 self.__local_sending_event.wait()
317
318 with self.__rlock_common:
319
320 if self.__tcp_con in in_err:
321 self.__tcp_con.close()
322 break
323
324 if self.__tcp_con not in ready_w:
325 self.__write_log('__proc_sending: unexpected select result!')
326 continue
327
328 if self.__local_sending_queue == None:
329 break
330
331 try:
332 sent_len = self.__tcp_con.send(self.__local_sending_queue)
333 self.__local_sending_queue = self.__local_sending_queue[sent_len:]
334
335 except error,e:
336 self.__tcp_con.close()
337 self.__write_log('__proc_sending failed! error:%s' %str(e))
338 break
339
340 if len(self.__local_sending_queue) == 0:
341 self.__local_sending_event.clear()
342 pass
343
344 def __check_peer_request(self):
345 if self.__peer_interested and not self.__local_choked:
346 while len(self.__peer_pending_request) > 0:
347 piece = self.__peer_pending_request.pop(0)
348 piece_index, offset, length = piece
349 if self.__local_have(piece_index):
350 data = self.__read_piecedata(piece_index,offset, length)
351 self.__send_piece(piece_index, offset, data)
352 else:
353 self.__write_log('peer request piece:%d not have.' %piece_index)
354 pass
355
356 def __check_local_request(self):
357 with self.__rlock_common:
358 self.__check_interested()
359
360 if self.__local_requesting and len(self.__local_requesting_pieces) >= self.__local_max_requesting:
361 return
362
363 if len(self.__local_pending_request) != 0:
364 if not self.__local_interested:
365 self.__send_interested()
366 else:
367 if len(self.__peer_have_pieces) != 0:
368 if self.__dispatch_timer != None:
369 self.__dispatch_timer.cancel()
370
371 self.__dispatch_timer = threading.Timer(self.__dispatch_timeout,PeerConnect.__check_local_request ,[self,])
372 self.__dispatch_timer.start()
373 self.__local_interested = False
374 self.__notify_pieces_completed()
375 return
376
377 if self.__peer_choked:
378 return
379
380 adding_piece = self.__local_pending_request.pop(0)
381 if adding_piece[0] not in self.__peer_have_pieces:
382 for piece in self.__local_pending_request:
383 if piece[0] == adding_piece[0]:
384 self.__local_pending_request.remove(piece)
385 self.__notify_pieces_canceled([adding_piece[0],])
386 self.__check_local_request()
387 else:
388 self.__local_requesting = True
389 self.__local_requesting_pieces.append(adding_piece)
390 self.__send_request(adding_piece)
391 self.__check_local_request()
392
393 if self.__local_requesting_timer == None or not self.__local_requesting_timer.is_alive():
394 self.__local_requesting_timer = threading.Timer(self.__local_requesting_timeout_intvl, PeerConnect.__local_requesting_timeout, [self,])
395 self.__local_requesting_timer.start()
396 pass
397
398 def __local_requesting_timeout(self):
399 with self.__rlock_common:
400 if self.__local_requesting:
401 self.__local_requesting = False
402 self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
403 self.__local_requesting_pieces = []
404 self.__local_interested = False
405 self.__check_local_request()
406
407 pass
408
409 def __peer_choked_timeout(self):
410 with self.__rlock_common:
411 if self.__peer_choked:
412 pending_pieces = []
413 for piece in self.__local_pending_request:
414 if piece[0] not in pending_pieces:
415 pending_pieces.append(piece[0])
416 if len(pending_pieces) != 0:
417 self.__notify_pieces_canceled(pending_pieces)
418 self.__local_pending_request = []
419
420
421
422 def __check_interested(self):
423 if not self.__local_requesting:
424 if len(self.__local_pending_request) != 0 and not self.__local_interested:
425 self.__send_interested()
426
427 if not self.__local_requesting and len(self.__local_pending_request) == 0:
428 if self.__local_interested:
429 self.__send_notintrested()
430 pass
431
432 def __retry_connect(self):
433
434 self.__write_log('__retry_connect')
435 pending_pieces = []
436 peer_dead = False
437
438 with self.__rlock_common:
439 self.__disconnect()
440
441 for piece in self.__local_pending_request:
442 if piece[0] not in pending_pieces:
443 pending_pieces.append(piece[0])
444
445 self.__retry_times += 1
446 if self.__retry_times > self.__max_retrys:
447 peer_dead = True
448
449 else:
450 self.__retry_timer = threading.Timer(self.__retry_intvl**self.__retry_times, PeerConnect.__connect, [self,])
451 self.__retry_timer.start()
452
453 if peer_dead == True:
454 self.__notify_peer_dead()
455
456 if len(pending_pieces) != 0:
457 self.__notify_pieces_canceled(pending_pieces)
458 self.__local_pending_request = []
459
460 def __reveived_data(self, data):
461 self.__receiving_cache += data
462
463 while len(self.__receiving_cache) >= 4:
464 msg_len = _str_ntohl(self.__receiving_cache[0:4])
465
466 if (len(self.__receiving_cache)-4) >= msg_len:
467 self.__proc_msg(self.__receiving_cache[4:(4+msg_len)])
468 self.__receiving_cache = data[4+msg_len:]
469 else:
470 break
471
472
473 def __proc_msg(self, msg):
474 with self.__rlock_common:
475 if len(msg) == 0:
476 self.__received_keepalive()
477 else:
478 msg_type = msg[0]
479 if msg_type == chr(0):
480 self.__received_choked()
481 elif msg_type == chr(1):
482 self.__received_unchoked()
483 elif msg_type == chr(2):
484 self.__received_interested()
485 elif msg_type == chr(3):
486 self.__received_notinterested()
487 elif msg_type == chr(4):
488 self.__received_have(msg[1:])
489 elif msg_type == chr(5):
490 self.__received_bitfield(msg[1:])
491 elif msg_type == chr(6):
492 self.__received_request(msg[1:])
493 elif msg_type == chr(7):
494 self.__received_piece(msg[1:])
495 elif msg_type == chr(8):
496 self.__received_cancel(msg[1:])
497 else:
498 self.__write_log('received unknown msg :%s' %list(msg))
499
500 def __handshake(self):
501 self.__write_log('send handshake: %s' %list(self.__handshake_info))
502 self.__tcp_con.sendall(self.__handshake_info)
503
504 try:
505 self.__tcp_con.settimeout(self.__tcp_handshake_timeout)
506 rsp = self.__tcp_con.recv(68)
507
508 if len(rsp) != 68:
509 return False
510
511 self.__tcp_con.settimeout(None)
512 self.__write_log('recived handshake rsp: %s' %list(rsp))
513 self.__peer_id = rsp[47:67]
514 self.__write_log('peer_id:%s' %self.__peer_id)
515
516 except (error,timeout), e:
517 self.__write_log('handshake failed, error:%s' %e)
518 return False
519 return True
520
521 def __received_keepalive(self):
522 self.__write_log('received keepalive')
523 #self.__send_keepalive()
524 pass
525
526 def __received_choked(self):
527
528 self.__peer_choked = True
529
530 if self.__local_requesting:
531 self.__local_requesting = False
532 self.__local_pending_request = self.__local_requesting_pieces + self.__local_pending_request
533 self.__local_requesting_pieces = []
534
535 self.__notify_peer_choked()
536
537 if len(self.__local_pending_request) != 0:
538 pending_pieces = []
539 for piece in self.__local_pending_request:
540 if piece[0] not in pending_pieces:
541 pending_pieces.append(piece[0])
542 self.__notify_pieces_canceled(pending_pieces)
543 self.__local_pending_request = []
544 self.__local_interested = False
545
546 self.__write_log('received choked')
547
548 def __received_unchoked(self):
549 self.__peer_choked = False
550
551 self.__notify_peer_unchoked()
552
553 #if len(self.__local_pending_request) < self.__local_pending_request_less:
554 self.__check_local_request()
555
556 if self.__peer_choked_timer != None:
557 self.__peer_choked_timer.cancel()
558
559 self.__write_log('received unchoked')
560
561
562 def __received_interested(self):
563 self.__peer_interested = True
564 #self.__send_unchoked()
565 self.__write_log('received interested')
566
567 def __received_notinterested(self):
568 self.__peer_interested = False
569 self.__peer_pending_request = []
570 self.__write_log('received notinterested')
571
572 def __received_have(self, data):
573 '''
574 TODO:Notify peer have pieces changed
575 '''
576 piece_index = _str_ntohl(data[0:4])
577 if piece_index not in self.__peer_have_pieces:
578 self.__peer_have_pieces.append(piece_index)
579 self.__peer_have_pieces_pending.append(piece_index)
580 if len(self.__local_pending_request) < self.__local_pending_request_less:
581 self.__notify_pieces_have(self.__peer_have_pieces_pending)
582 self.__peer_have_pieces_pending = []
583
584
585 self.__write_log('received have piece:%d' %piece_index)
586
587 def __received_bitfield(self, data):
588 '''
589 '''
590 bitfield_len = len(data)
591 for i in range(0,bitfield_len):
592 byte = data
593 for j in range(0,8):
594 byte_mask = 116)%0x100)
799 msg += chr((integer>>8)%0x100)
800 msg += chr(integer%0x100)
801 return msg
802
803 def _str_ntohl(msg):
804 integer = 0
805 integer += ord(msg[0]) |
|