设为首页 收藏本站
查看: 562|回复: 0

[经验分享] hadoop集群同步实现

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2016-9-23 09:24:35 | 显示全部楼层 |阅读模式
                      #!/usr/bin/env python
#coding=utf-8
#scribe日志接收存在小集群到大集群之间, distcp 同步失败的情况,需要手动进行补入。
#1、如果查询补入的日志量少,则可以之间用脚本处理。如果量大,则使用 hadoop 提交job。
# hadoop job 提交方式:
#   hadoop jar /usr/local/hadoop-2.4.0/share/hadoop/tools/lib/hadoop-distcp-2.4.0.jar -m 100 hdfs://scribehadoop/scribelog/common_act/2016/08/02/13/  /file/realtime/distcpv2/scribelog/common_act/2016/08/02/13  --update

#  --update 参数表示如果目标地址目录存在,则更新该目录中的内容。

#手动同步脚本使用方法: python manual_check_sync.py  dst_path  
#脚本完成大集群和小集群之间的目录大小比较,目录文件比较。 输出差异文件列表。最后完成同步入库。



import sys,os,commands,re
import logging,logging.handlers

Module=sys.argv[1].split("scribelog")[1]


logger1 = logging.getLogger('mylogger1')
logger1.setLevel(logging.DEBUG)
# 创建一个handler,用于写入日志文件
filehandle = logging.FileHandler('log/test.log',mode='a')
# 再创建一个handler,用于输出到控制台
consehandle = logging.StreamHandler()

# 定义handler的输出格式formatter
formatter = logging.Formatter('[%(asctime)s]:%(message)s',datefmt='%F %T')
#formatter = logging.Formatter('[%(asctime)s-line:%(lineno)d-%(levelname)s]:%(message)s',datefmt='%F %T')
filehandle.setFormatter(formatter)
consehandle.setFormatter(formatter)

logger1.addHandler(filehandle)
logger1.addHandler(consehandle)




little_cluster="hdfs://yz632.hadoop.com.cn/scribelog"
large_cluster="hdfs://ns1"

#logger1.info("源目录(小集群):%s%s" % ( little_cluster,Module))

logger1.info("源目录(小集群):%s%s" %(little_cluster,Module))
logger1.info("目标目录(大集群):%s%s" %(large_cluster,sys.argv[1]))



#统计目录大小等情况
du_little=commands.getoutput("hadoop fs -count " + little_cluster + Module)
du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv[1])

#获取的值是str类型,所以需要转为list来做比较。

logger1.info("                       DIR_COUNT        FILECOUNT        CONTENTSIZE ")
logger1.info("小集群目录信息:%s" %(du_little))
logger1.info("大集群目录信息:%s" %(du_large))


#Python的str类有split方法,只能根据指定的某个字符分隔字符串,re模块中提供的split方法可以定义多个分隔符。这里可以用单个空格作为分隔即可。
du_little=re.split('         |           | ' ,du_little)
du_large=re.split('         |           | ' ,du_large)


#du_little=du_little.split(" ")
#du_large=du_large.split(" ")


#print du_large,du_little
#print du_large[3],du_little[3]
#print du_large[6],du_little[6]
#print du_large[7],du_little[7]
#
if du_little[3] == du_large[3]  and du_little[6] == du_large[6] and du_little[7] == du_large[7]:
        logger1.info("大小集群文件数量、大小一致,不需要同步")
        exit()



#如果大小不一致,取出目录下所有文件
little_list=commands.getoutput("hadoop fs -lsr " + little_cluster + Module + "|grep -v \"^d\"" )
large_list =commands.getoutput("hadoop fs -lsr " + large_cluster + sys.argv[1]+"|grep -v \"^d\"")

#logger1.info( "小集群情况:%s " %(little_list))
#logger1.info( "大集群情况:%s ")%(large_list ))



list1=[]
list2=[]
lost_list=[]
for i in  little_list.split("\n"):
        list1.append(i.split("scribelog")[1])

for i in  large_list.split("\n"):
        list2.append(i.split("scribelog")[1])


#logger1.info("小集群目录文件:",list1
#logger1.info("大集群目录文件" ,list2


logger1.info("对比大小集群文件---》未同步文件列表:")
for i in list1:
        if i not in list2:
                logger1.info(i)
                lost_list.append(i)

logger1.info(lost_list)


#拉取小集群文件到本地tmp目录中
for i in lost_list:       
        s=commands.getstatusoutput("hadoop fs -get " + little_cluster + i+" /tmp/")
        logger1.info("拉取小集群文件到本地tmp目录中%s" %s )


#入库到大集群
for i in lost_list:
        logger1.info(i)
        j=commands.getstatusoutput("sudo su - datacopy -s /bin/bash -c '/usr/local/hadoop-2.4.0/bin/hadoop fs -put  /tmp/" +i.split("/")[-1] + "  /file/realtime/distcpv2/scribelog"+i+" '" )
        logger1.info(j)
        if j[0] == 0 :
                logger1.info(" %s + 同步完成" %(i))


#clear tmp file
for i  in lost_list:
        commands.getstatusoutput("rm -f /tmp/"+i.split("/")[-1] )

#当同步目录时,可能会出现put的时候提示没有目录存在。参考赵兵的 put  -  src dest  方法,是否可以避免此类问题有待验证。



du_little=commands.getoutput("hadoop fs -count " + little_cluster + Module)
du_large=commands.getoutput("hadoop fs -count " + large_cluster + sys.argv[1])


logger1.info("                  DIR_COUNT       FILECOUNT       CONTENTSIZE ")
logger1.info("小集群目录信息:%s" %(du_little))
logger1.info("大集群目录信息:%s" %(du_large))


#Python的str类有split方法,但是这个split方法只能根据指定的某个字符分隔字符串,re模块中提供的split方法可以用来做这件事情
du_little=re.split('         |           | ' ,du_little)
du_large=re.split('         |           | ' ,du_large)


if du_little[3] == du_large[3]  and du_little[6] == du_large[6] and du_little[7] == du_large[7]:
        logger1.info("小集群文件数量、大小一致,不需要同步")
        exit()

                   


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-276271-1-1.html 上篇帖子: centos6安装hadoop 下篇帖子: centos6安装hadoop
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表