|
大家都知道百度云网盘上有很多分享的资源,包括软件、各类视频自学教程、电子书、甚至各种电影、BT种子应有尽有,但百度云却没有提供相应的搜索功能。个人平时要找一些软件、美剧觉得非常蛋疼。于是就尝试开发一个百度云资源的搜索系统。
资源爬虫思路:
搜索引擎么最重要的就是有海量的资源了,有了资源,只要再基于资源实现全文检索功能就是一个简单的搜索引擎了。首先我需要爬取百度云的分享资源,爬取思路,打开任意一个百度云分享者的主页yun.baidu.com/share/home?uk=xxxxxx&view=share#category/type=0,你可以发现分享者有订阅者和粉丝,你可以递归遍历订阅者和粉丝,从而获得大量分享者uk,进而获得大量的分享资源。
系统实现环境:
语言:python
操作系统:Linux
其他中间件:nginx mysql sphinx
系统包括几个独立的部分:
1、基于requests实现的独立资源爬虫
2、基于开源全文检索引擎sphinx实现的资源索引程序
3、基于Django+bootstrap3开发的简易网站,网站搭建采用nginx1.8+fastCGI(flup)+python。演示网站http://www.itjujiao.com
PS:
目前爬虫爬取了4000W左右的数据,sphinx对内存的要求实在太大了,巨坑。
百度会对爬虫做ip限制,写了个简单的xicidaili代理采集程序,requests可以配置http代理。
分词是sphinx自带的实现,支持中文分词,中文基于一元分词,有点过度分词,分词效果不是特别理想,比如我搜关键词“叶问3”出现的结果中会有“叶子的问题第3版”,不符合预期。英文分词有很多可以改善的地方,比如我搜xart不会出现x-art的结果,而实际上x-art却也是我想要的结果集(你们懂的)。
数据库是mysql,资源表,考虑单表记录上限,分了10个表。第一次爬完sphinx做全量索引,后续做增量索引。
后续优化:
1、分词处理,目前分词搜索结果不是很理想,有大神可以指点下思路。比如我检索“功夫熊猫之卷轴的秘密”,一个结果都没有。而检索“功夫熊猫“有结果集(功丶夫熊猫⒊英语中英字幕.mp4,功丶夫熊猫2.Kung.Fu.Panda.2.2011.BDrip.720P.国粤英台四语.特效中英字幕.mp4,功丶夫熊猫3(韩版)2016.高清中字.mkv等)或搜索”卷轴的秘密“有结果集([美国]功夫潘达之卷轴的秘密.2016.1080p.mp4,g夫熊猫之卷轴的秘密.HD1280超清中英双字.mp4等)
2、数据去重,目前发现抓取的数据很多是共享资源,后续考虑基于MD5去重
爬虫部分实现代码(只是思路代码有点乱):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
| #coding: utf8
import re
import urllib2
import time
from Queue import Queue
import threading, errno, datetime
import json
import requests
import MySQLdb as mdb
DB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PASS = ''
re_start = re.compile(r'start=(\d+)')
re_uid = re.compile(r'query_uk=(\d+)')
re_pptt = re.compile(r'&pptt=(\d+)')
re_urlid = re.compile(r'&urlid=(\d+)')
ONEPAGE = 20
ONESHAREPAGE = 20
URL_SHARE ='http://yun.baidu.com/pcloud/feed/getsharelist?auth_type=1&start={start}&limit=20&query_uk={uk}&urlid={id}'
URL_FOLLOW ='http://yun.baidu.com/pcloud/friend/getfollowlist?query_uk={uk}&limit=20&start={start}&urlid={id}'
URL_FANS ='http://yun.baidu.com/pcloud/friend/getfanslist?query_uk={uk}&limit=20&start={start}&urlid={id}'
QNUM = 1000
hc_q = Queue(20)
hc_r = Queue(QNUM)
success = 0
failed = 0
PROXY_LIST = [[0, 10,"42.121.33.160", 809, "", "", 0],
[5, 0, "218.97.195.38", 81, "", "", 0],
]
def req_worker(inx):
s =requests.Session()
whileTrue:
req_item = hc_q.get()
req_type = req_item[0]
url = req_item[1]
r= s.get(url)
hc_r.put((r.text, url))
print "req_worker#", inx, url
def response_worker():
dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'baiduyun',charset='utf8')
dbcurr = dbconn.cursor()
dbcurr.execute('SET NAMES utf8')
dbcurr.execute('set global wait_timeout=60000')
whileTrue:
metadata, effective_url = hc_r.get()
#print "response_worker:", effective_url
try:
tnow = int(time.time())
id = re_urlid.findall(effective_url)[0]
start = re_start.findall(effective_url)[0]
if True:
if 'getfollowlist' in effective_url: #type = 1
follows =json.loads(metadata)
uid =re_uid.findall(effective_url)[0]
if "total_count"in follows.keys() and follows["total_count"]>0 and str(start) =="0":
for i inrange((follows["total_count"]-1)/ONEPAGE):
try:
dbcurr.execute('INSERT INTO urlids(uk, start, limited, type, status)VALUES(%s, %s, %s, 1, 0)' % (uid, str(ONEPAGE*(i+1)), str(ONEPAGE)))
except Exception asex:
print"E1", str(ex)
pass
if "follow_list"in follows.keys():
for item infollows["follow_list"]:
try:
dbcurr.execute('INSERT INTO user(userid, username, files, status,downloaded, lastaccess) VALUES(%s, "%s", 0, 0, 0, %s)' %(item['follow_uk'], item['follow_uname'], str(tnow)))
except Exception asex:
print"E13", str(ex)
pass
else:
print "delete1", uid, start
dbcurr.execute('deletefrom urlids where uk=%s and type=1 and start>%s' % (uid, start))
elif 'getfanslist' in effective_url: #type = 2
fans = json.loads(metadata)
uid =re_uid.findall(effective_url)[0]
if "total_count"in fans.keys() and fans["total_count"]>0 and str(start) =="0":
for i inrange((fans["total_count"]-1)/ONEPAGE):
try:
dbcurr.execute('INSERT INTO urlids(uk, start, limited, type, status)VALUES(%s, %s, %s, 2, 0)' % (uid, str(ONEPAGE*(i+1)), str(ONEPAGE)))
except Exception asex:
print"E2", str(ex)
pass
if "fans_list" infans.keys():
for item infans["fans_list"]:
try:
dbcurr.execute('INSERTINTO user(userid, username, files, status, downloaded, lastaccess) VALUES(%s,"%s", 0, 0, 0, %s)' % (item['fans_uk'], item['fans_uname'],str(tnow)))
except Exception asex:
print "E23",str(ex)
pass
else:
print "delete2", uid, start
dbcurr.execute('deletefrom urlids where uk=%s and type=2 and start>%s' % (uid, start))
else:
shares =json.loads(metadata)
uid =re_uid.findall(effective_url)[0]
if "total_count"in shares.keys() and shares["total_count"]>0 and str(start) =="0":
for i inrange((shares["total_count"]-1)/ONESHAREPAGE):
try:
dbcurr.execute('INSERT INTO urlids(uk, start, limited, type, status)VALUES(%s, %s, %s, 0, 0)' % (uid, str(ONESHAREPAGE*(i+1)), str(ONESHAREPAGE)))
except Exception asex:
print"E3", str(ex)
pass
if "records" inshares.keys():
for item inshares["records"]:
try:
dbcurr.execute('INSERT INTO share(userid, filename, shareid, status)VALUES(%s, "%s", %s, 0)' % (uid, item['title'], item['shareid']))
except Exception asex:
#print"E33", str(ex), item
pass
else:
print "delete0", uid, start
dbcurr.execute('deletefrom urlids where uk=%s and type=0 and start>%s' % (uid, str(start)))
dbcurr.execute('delete from urlids where id=%s' % (id, ))
dbconn.commit()
except Exception as ex:
print "E5", str(ex), id
pid = re_pptt.findall(effective_url)
if pid:
print "pid>>>", pid
ppid = int(pid[0])
PROXY_LIST[ppid][6] -= 1
dbcurr.close()
dbconn.close()
def worker():
global success, failed
dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, 'baiduyun',charset='utf8')
dbcurr = dbconn.cursor()
dbcurr.execute('SET NAMES utf8')
dbcurr.execute('set global wait_timeout=60000')
whileTrue:
#dbcurr.execute('select * from urlids where status=0 order by type limit1')
dbcurr.execute('select * from urlids where status=0 and type>0 limit1')
d= dbcurr.fetchall()
#print d
if d:
id = d[0][0]
uk = d[0][1]
start = d[0][2]
limit = d[0][3]
type = d[0][4]
dbcurr.execute('update urlids set status=1 where id=%s' % (str(id),))
url = ""
if type == 0:
url = URL_SHARE.format(uk=uk, start=start, id=id).encode('utf-8')
elif type == 1:
url = URL_FOLLOW.format(uk=uk, start=start, id=id).encode('utf-8')
elif type == 2:
url = URL_FANS.format(uk=uk, start=start, id=id).encode('utf-8')
if url:
hc_q.put((type, url))
#print "processed", url
else:
dbcurr.execute('select * from user where status=0 limit 1000')
d = dbcurr.fetchall()
if d:
for item in d:
try:
dbcurr.execute('insertinto urlids(uk, start, limited, type, status) values("%s", 0, %s, 0,0)' % (item[1], str(ONESHAREPAGE)))
dbcurr.execute('insertinto urlids(uk, start, limited, type, status) values("%s", 0, %s, 1,0)' % (item[1], str(ONEPAGE)))
dbcurr.execute('insertinto urlids(uk, start, limited, type, status) values("%s", 0, %s, 2,0)' % (item[1], str(ONEPAGE)))
dbcurr.execute('updateuser set status=1 where userid=%s' % (item[1],))
except Exception as ex:
print "E6",str(ex)
else:
time.sleep(1)
dbconn.commit()
dbcurr.close()
dbconn.close()
for item in range(16):
t =threading.Thread(target = req_worker, args = (item,))
t.setDaemon(True)
t.start()
s = threading.Thread(target = worker, args =())
s.setDaemon(True)
s.start()
response_worker()
|
|
|