设为首页 收藏本站
查看: 1113|回复: 0

[经验分享] 用python进行分布式网页数据抓取(三)—— 编码实现

[复制链接]

尚未签到

发表于 2017-5-8 11:14:53 | 显示全部楼层 |阅读模式
  呵呵,前两节好像和python没多大关系。。这节完全是贴代码,
  这是我第一次写python,很多地方比较乱,主要就看看逻辑流程吧。
  对于编码格式确实搞得我头大。。取下来页面不知道是什么编码,所以先找charset,然后转unicode。统一在unicode下操作,但是数据库是utf8的,WINDOWS的控制台又必须是gbk的,但是我IDE控制台必须是utf8的。。所以才会有DEBUG这个变量存在。。。主要是为了控制输出编码。
  本程序连跑了24小时,然后分布式在10台机器上部署,长时间续航基本没有问题。
  之后每天将进行10万次网页的爬取。
  源码如下:
  内容爬取及工具
  '''Created on 2010-9-15@author: chenggong'''import urllib2import reimport socketDEBUG = 0'''工具类'''class Tools():#log函数@staticmethod def writelog(level,info,notify=False):if DEBUG == 0:try:print "["+level+"]"+info.decode('UTF-8').encode('GBK') except:print "["+level+"]"+info.encode('GBK') else:print "["+level+"]"+info#if notify:#    print "[notify]报告管理员!!"#转unicode@staticmethod def toUnicode(s,charset):if( charset == "" ):return selse:try:u = unicode( s, charset )except:u = ""return u #正则抓取#@param single 是否只抓取一个@staticmethod def getFromPatten(patten,src,single=False):rst = "";p = re.compile(patten,re.S)all = p.findall(src)for matcher in all:rst += matcher + " "if( single ):breakreturn rst.strip()'''网页内容爬虫'''class PageGripper():URL_OPEN_TIMEOUT = 10 #网页超时时间MAX_RETRY = 3 #最大重试次数def __init__(self):socket.setdefaulttimeout(self.URL_OPEN_TIMEOUT)#获取字符集def getCharset(self,s):rst = Tools.getFromPatten(u'charset=(.*?)"',s,True)if rst != "":if rst == "utf8":rst = "utf-8"return rst#尝试获取页面def downloadUrl(self,url):charset = ""page = ""retry = 0while True:try:fp = urllib2.urlopen(url)breakexcept urllib2.HTTPError,e: #状态错误Tools.writelog('error','HTTP状态错误 code='+e.code)raise urllib2.HTTPErrorexcept urllib2.URLError,e: #网络错误超时Tools.writelog('warn','页面访问超时,重试..')retry+=1if( retry > self.MAX_RETRY ):Tools.writelog('warn','超过最大重试次数,放弃')raise urllib2.URLErrorwhile True:line = fp.readline()if charset == "":charset = self.getCharset(line)if not line:breakpage += Tools.toUnicode(line,charset)fp.close()return page#获取页面def getPageInfo(self,url):Tools.writelog( "info","开始抓取网页,url= "+url)info = ""try:info = self.downloadUrl(url)except:raise        Tools.writelog("debug","网页抓取成功")return info'''内容提取类'''class InfoGripper():pageGripper = PageGripper()def __init__(self):Tools.writelog('debug',"爬虫启动")#抓取标题def griptitle(self,data):title = Tools.getFromPatten(u'box2t sp"><h3>(.*?)</h3>', data, True)if title == "":title = Tools.getFromPatten(u'<title>(.*?)[-<]',data,True)return title.strip()#抓取频道def gripchannel(self,data):zone = Tools.getFromPatten(u'频道:(.*?)</span>',data,True)channel = Tools.getFromPatten(u'<a.*?>(.*?)</a>',zone,True)return channel#抓取标签def griptag(self,data):zone = Tools.getFromPatten(u'标签:(.*?)</[^a].*>',data,True);rst = Tools.getFromPatten(u'>(.*?)</a>',zone,False);return rst#抓取观看次数def gripviews(self,data):rst = Tools.getFromPatten(u'已经有<em class="hot" id="viewcount">(.*?)</em>次观看',data);return rst#抓取发布时间def griptime(self,data):rst = Tools.getFromPatten(u'在<em>(.*?)</em>发布',data,True)return rst#抓取发布者def gripuser(self,data):rst = Tools.getFromPatten(u'title="点击进入(.*?)的用户空间"',data,True)return rst#获取页面字符集def getPageCharset(self,data):charset = Tools.getFromPatten(u'charset=(.*?)"',data,True)if( charset == "utf8" ):charset = "utf-8"return charset#获取CC相关数据def getCCData(self,data):zone = Tools.getFromPatten(u'SWFObject(.*?)</script>',data,True)#判断是否使用bokecc播放isFromBokeCC = re.match('.*bokecc.com.*', zone)if( not isFromBokeCC ):return "",""ccSiteId = Tools.getFromPatten(u'siteid=(.*?)[&,"]',zone,True)ccVid = Tools.getFromPatten(u'vid=(.*?)[&,"]',zone,True)return ccSiteId,ccVid#获取站内viddef gripVideoId(self,data):vid = Tools.getFromPatten(u'var vid = "(.*?)"',data,True)return vid#获取点击量def gripViewsAjax(self,vid,url,basedir):host = Tools.getFromPatten(u'http://(.*?)/',url,True)ajaxAddr = "http://" + host + basedir + "/index.php/ajax/video_statistic/" + vid'''try:content = self.pageGripper.getPageInfo(ajaxAddr)except Exception,e:print eTools.writelog ("error", ajaxAddr+u"抓取失败")return "error"'''Tools.writelog('debug', u"开始获取点击量,url="+ajaxAddr)while True:try:fp = urllib2.urlopen(ajaxAddr)breakexcept urllib2.HTTPError,e: #状态错误Tools.writelog('error','HTTP状态错误 code='+"%d"%e.code)return ""except urllib2.URLError,e: #网络错误超时Tools.writelog('warn','页面访问超时,重试..')retry+=1if( retry > self.MAX_RETRY ):Tools.writelog('warn','超过最大重试次数,放弃')return ""content = fp.read()fp.close()views = Tools.getFromPatten(u'"viewcount":(.*?),',content,True)views = views.replace('"','')return views#从网页内容中爬取点击量 def gripViewsFromData(self,data):views = Tools.getFromPatten(u'已经有<.*?>(.*?)<.*?>次观看',data,True)return viewsdef gripBaseDir(self,data):dir = Tools.getFromPatten(u"base_dir = '(.*?)'",data,True)return dir#抓取数据def gripinfo(self,url): try:data = self.pageGripper.getPageInfo(url)except:Tools.writelog ("error", url+" 抓取失败")raiseTools.writelog('info','开始内容匹配')rst = {}rst['title'] = self.griptitle(data)rst['channel'] = self.gripchannel(data)rst['tag'] = self.griptag(data)rst['release'] = self.griptime(data)rst['user'] = self.gripuser(data)ccdata = self.getCCData(data)rst['ccsiteId'] = ccdata[0]rst['ccVid'] = ccdata[1]views = self.gripViewsFromData(data)if views =="" or not views:vid = self.gripVideoId(data)basedir = self.gripBaseDir(data)views = self.gripViewsAjax(vid,url,basedir)if( views == "" ):views = "error"if( views == "error"):Tools.writelog("error","获取观看次数失败")Tools.writelog("debug","点击量:"+views)rst['views'] = viewsTools.writelog('debug','title=%s,channel=%s,tag=%s'%(rst['title'],rst['channel'],rst['tag']))return rst'''单元测试'''if __name__ == '__main__':list = ['http://008yx.com/xbsp/index.php/video/index/3138','http://vblog.xwhb.com/index.php/video/index/4067','http://demo.ccvms.bokecc.com/index.php/video/index/3968','http://vlog.cnhubei.com/wuhan/20100912_56145.html','http://vlog.cnhubei.com/html/js/30271.html','http://www.ddvtv.com/index.php/video/index/15','http://boke.2500sz.com/index.php/video/index/60605','http://video.zgkqw.com/index.php/video/index/334','http://yule.hitmv.com/html/joke/27041.html','http://www.ddvtv.com/index.php/video/index/11','http://www.zgnyyy.com/index.php/video/index/700','http://www.kdianshi.com/index.php/video/index/5330','http://www.aoyatv.com/index.php/video/index/127','http://v.ourracing.com/html/channel2/64.html','http://v.zheye.net/index.php/video/index/93','http://vblog.thmz.com/index.php/video/index/7616','http://kdianshi.com/index.php/video/index/5330','http://tv.seeyoueveryday.com/index.php/video/index/95146','http://sp.zgyangzhi.com/html/ji/2.html','http://www.xjapan.cc/index.php/video/index/146','http://www.jojy.cn/vod/index.php/video/index/399','http://v.cyzone.cn/index.php/video/index/99',]list1 = ['http://192.168.25.7:8079/vinfoant/versionasdfdf']infoGripper = InfoGripper()for url in list:infoGripper.gripinfo(url)del infoGripper
  WEB服务及任务调度
  '''Created on 2010-9-15@author: chenggong'''# -*- coding: utf-8 -*-import string,cgi,timefrom os import curdir,sepfrom BaseHTTPServer import BaseHTTPRequestHandler,HTTPServerfrom InfoGripper import *import reimport MySQLdbimport timeimport threadingimport urllibimport urllib2PORT = 8079VERSION = 0.1DBCHARSET = "utf8"PARAMS = ['callback','sessionId','retry','retryInterval','dbhost','dbport','db','dbuser','dbpass','videoId']DBMAP = ['video_id','ccsiteid','ccvid','desc_url','site_id','title','post_time','author','elapse','channel','tags','create_time','check_time','status']'''ERROR CODE定义'''ERR_OK = 0ERR_PARAM = 1ERR_HTTP_TIMEOUT = 5ERR_HTTP_STATUS = 6ERR_DB_CONNECT_FAIL = 8ERR_DB_SQL_FAIL = 9ERR_GRIPVIEW = 11ERR_UNKNOW = 12'''数据库适配器'''class DBAdapter(object):def __init__(self):self.param = {'ip':'','port':0,'user':'','pw':'','db':''}self.connect_once = False  #是否连接过数据库'''创建/更新数据库连接池'''def connect(self,ip,port,user,pw,db):if( ip != self.param['ip'] orport != self.param['port'] oruser != self.param['user'] orpw != self.param['pw'] ordb != self.param['db']):Tools.writelog('info','更换数据库连接池,ip='+ip+',port='+port+',user='+user+',pw='+pw+',db='+db)try:if self.connect_once == True: #释放上次连接self.cur.close()self.conn.close()self.conn=MySQLdb.connect(user=user,passwd=pw,db=db,host=ip,port=int(port))self.conn.set_character_set(DBCHARSET)self.connect_once = Trueself.cur=self.conn.cursor(MySQLdb.cursors.Cursor)self.param['ip'] = ipself.param['port'] = portself.param['user'] = userself.param['pw'] = pwself.param['db'] = dbexcept:Tools.writelog('error',u'数据库连接失败',True) raiseelse:Tools.writelog('info',u'数据库连接成功')'''执行SQL语句'''def execute(self,sql):Tools.writelog('debug',u'执行SQL: '+sql)try:self.cur.execute(sql)except:Tools.writelog('error',u'SQL执行错误:'+sql)raise'''查询数据库'''def query(self,sql):row = {}self.execute(sql)row=self.cur.fetchall()return row'''视频错误'''def updateErr(self,videoId):nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))sql = "UPDATE videos SET "sql += "check_time='" + nowtime +"',"sql += "status=-1 "sql += "WHERE video_id="+videoIdself.execute(sql)self.conn.commit() '''更新查询结果'''def update(self,obj,videoId,isUpdateTitle=True):Tools.writelog('debug','开始更新数据库')try:#更新video表sql = "UPDATE videos SET "if(obj['ccsiteId'] !="" ):sql += "ccsiteid='" + obj['ccsiteId'] + "',"if(obj['ccVid'] != "" ):sql += "ccvid='" + obj['ccVid'] + "',"if isUpdateTitle:sql += "title='" + obj['title'] + "',"sql += "post_time='" + obj['release'] + "',"sql += "author='" + obj['user'] + "',"sql += "channel='" + obj['channel'] + "',"sql += "tags='" + obj['tag'] + "',"nowtime = time.strftime('%Y-%m-%d-%H-%M-%S',time.localtime(time.time()))sql += "check_time='" + nowtime +"',"sql += "status=0 "sql += "WHERE video_id="+videoIdself.execute(sql)#更新count表if( obj['views'] != 'error' ):nowdate = time.strftime('%Y-%m-%d',time.localtime(time.time()))sql = "SELECT * FROM counts WHERE "sql += "date = '" + nowdate + "' and video_id=" + videoIdrst = self.query(sql)if len(rst) > 0:#如果当天已有记录,则更新sql = "UPDATE counts SET count="+obj['views']sql +=" WHERE video_id=" + videoId + " AND date='" + nowdate+ "'"else:#否则插入sql = "INSERT INTO counts VALUES"sql += "(null," +videoId+",'"+nowdate+"',"+obj['views'] + ")"self.execute(sql)               self.conn.commit()     Tools.writelog('debug', "db commit ok")return ERR_OKexcept Exception,e:print ereturn ERR_DB_SQL_FAIL'''任务线程类'''class TaskThread(threading.Thread):def setTaskTool(self,dbAdapter,gripper):self.dbAdapter = dbAdapterself.gripper = gripperdef setParam(self,param):self.param = paramself.videoId = param['videoId']assert self.videoId != ""def init(self):self.views = "0"self.errcode = ERR_OKdef run(self):Tools.writelog('debug','开始爬虫任务,sessionId='+self.param['sessionId'])self.init()try:#更新数据库连接self.dbAdapter.connect(self.param['dbhost'],self.param['dbport'],self.param['dbuser'],self.param['dbpass'],self.param['db'])except:self.errcode = ERR_DB_CONNECT_FAIL #数据库连接失败callback(self.errcode)return#查询该vid的视频sql = "SELECT "for column in DBMAP:sql += columnif column != DBMAP[len(DBMAP)-1]:sql += ","sql += " FROM videos"sql += " WHERE video_id="+self.videoIdvideo = self.dbAdapter.query(sql)assert not (len(video)>1 or len(video)==0) #有且仅有一条记录url = video[0][3]assert url != "" try:rst = self.gripper.gripinfo(url)except urllib2.HTTPError,e:self.errcode = ERR_HTTP_STATUS #HTTP状态错误self.dbAdapter.updateErr(self.videoId)except urllib2.URLError,e:self.errcode = ERR_HTTP_TIMEOUT #HTTP连接超时self.dbAdapter.updateErr(self.videoId)except:self.errcode = ERR_UNKNOW #未知错误self.dbAdapter.updateErr(self.videoId)else:self.views = rst['views']if self.views == "error":self.views = "-1"self.errcode = ERR_GRIPVIEW #数据抓取成功,点击量抓取失败#更新数据库(特殊处理,如果原title中有 "-" 则不更新title字段)title = video[0][5]assert title != ""if re.match('.*-.*', title):self.errocde = self.dbAdapter.update(rst,self.videoId,True)else:self.errcode = self.dbAdapter.update(rst,self.videoId)self.callback(self.errcode)Tools.writelog('info','任务结束,sessionId='+self.param['sessionId'])returndef callback(self,errcode):   results = {'errorcode':errcode,'count':int(self.views)}results = urllib.urlencode(results)results = results.replace('&', '%26')url = self.param['callback']url += "?"url += "sessionId=" + self.param['sessionId']url += "&results=" + resultsretry = 0while True:try:Tools.writelog('debug',"回调主控,url="+url)urllib2.urlopen(url)Tools.writelog('debug','回调成功')breakexcept urllib2.URLError, e: #超时、错误Tools.writelog('debug','回调主控超时,%s秒后重试'%self.param['retryInterval'])retry+=1time.sleep(int(self.param['retryInterval']))if( retry > int(self.param['retry'])):Tools.writelog('error','回调主控失败')return '''WEB服务类'''class MyHandler(BaseHTTPRequestHandler):dbAdapter = DBAdapter()gripper = InfoGripper()def pageSuccess(self):self.send_response(200)self.send_header('Content-type', 'text/html')self.end_headers()def pageFail(self):self.send_error(404, "not found")def getValue(self,param):src = self.path + '&'reg = param + '=' + '(.*?)&'value = Tools.getFromPatten(reg,src,True)return valuedef do_GET(self):isGetVersion = re.match('.*vinfoant/version.*', self.path)isTask = re.match('.*vinfoant/run.*', self.path)if( isGetVersion ):self.pageSuccess()self.wfile.write(VERSION)elif( isTask ):self.pageSuccess()param = {}for p in PARAMS:param[p] = self.getValue(p) #获取各项参数taskThread = TaskThread()taskThread.setTaskTool(self.dbAdapter, self.gripper)taskThread.setParam(param)taskThread.start()#启动任务线程self.wfile.write("ok")        else:self.pageFail()return'''启动WEB服务,全局入口'''def startHttpd():try:Tools.writelog('debug','httpd start..listen on '+str(PORT))httpd = HTTPServer(('',PORT), MyHandler )Tools.writelog('debug','success')httpd.serve_forever()except KeyboardInterrupt:Tools.writelog('debug','httpd close..')httpd.socket.close()if __name__ == '__main__':startHttpd()

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-374618-1-1.html 上篇帖子: 零基础学python-12.5 修改列表的误区以及使用for和range修改列表 下篇帖子: Learn Python The Hard Way学习(43)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表