设为首页 收藏本站
查看: 1886|回复: 0

[经验分享] Hadoop数据收集与入库系统Flume与Sqoop

[复制链接]

尚未签到

发表于 2017-12-17 23:45:19 | 显示全部楼层 |阅读模式
  Hadoop提供了一个中央化的存储系统,其有利于进行集中式的数据分析与数据共享。 Hadoop对存储格式没有要求。可以存储用户访问日志、产品信息以及网页数据等数据。
  常见的两种数据来源。一种是分散的数据源:机器产生的数据、用户访问日志以及用户购买日志。另一种是传统系统中的数据:传统关系型数据库(MySQL、Oracle)、磁盘阵列以及磁带。
DSC0000.jpg

  Flume由三部分构成。Master负责负责通信及配置管理,是集群的控制器。Collector用于对数据进行聚合。往往会产生一个更大的数据流。然后加载到HDFS上。Agent负责采集数据。其是Flume中产生数据流的地方,同时Agent会将产生的数据传输到Collector.
  Agent 用于采集数据是数据流产生的地方。通常由source和sink两部分组成 。Source用于获取数据,可从文本文件,syslog,HTTP等获 取数据。 Sink将Source获得的数据进一步传输给后面的Collector。 Flume自带了很多source和sink实现。 syslogTcp(5140) | agentSink("localhost",35853) 意思是通过通信协议获取5140端口数据,然后发送到localhost主机的35853端口。 tail("/etc/services") | agentSink("localhost",35853) 意思是读取文件夹/etc/services下文件,然后发送到localhost主机的35853端口。
  Collector 用于汇总多个Agent结果 ,将汇总结果导入后端存储系统,比如HDFS,HBase。 Flume自带了很多collector实现 collectorSource(35853) | console  表示将收集结果写到控制台。CollectorSource(35853) | collectorSink("file:///tmp/flume/collected", "syslog") 表示将收集结果写到本地目录。collectorSource(35853) | collectorSink("hdfs://namenode/user/flume/ ","syslog"); 表示将收集结果写到hdfs目录。
DSC0001.jpg

  一般为了防止一个collector挂掉所有agent都失效可以将不同的agent连接不同的collector。同时,这样可以保证不同数据类型的agent将数据放到同一个collector。
  Master 负责管理协调 agent 和collector的配置信息,是Flume集群的控制器。其可跟踪数据流的最后确认信息,并通知agent。 通常需配置多个master以防止单点故障。借助zookeeper管理管理多Master。
  构建基于Flume的数据收集系统
  Agent和Collector均可以动态配置。可通过命令行或Web界面配置。 命令行配置 : 在已经启动的master节点上,依次输入”flume shell””connect localhost ”    如执行 exec config a1 ‘tailDir(“/data/logfile”)’ ‘agentSink’ 。 Web界面 : 选中节点,填写source、sink等信息。
  常用架构举例—拓扑1  
   DSC0002.jpg
  agentA : tail(“/ngnix/logs”) | agentSink("collector",35853);
  agentB : tail(“/ngnix/logs”) | agentSink("collector",35853);
  agentC : tail(“/ngnix/logs”) | agentSink("collector",35853);
  agentD : tail(“/ngnix/logs”) | agentSink("collector",35853);
  agentE : tail(“/ngnix/logs”) | agentSink("collector",35853);
  agentF : tail(“/ngnix/logs”) | agentSink("collector",35853); collector : collectorSource(35853) | collectorSink("hdfs://namenode/flume/","srcdata");
DSC0003.jpg

  agentA : src | agentE2ESink("collectorA",35853);
  agentB : src | agentE2ESink("collectorA",35853);
  agentC : src | agentE2ESink("collectorB",35853);
  agentD : src | agentE2ESink("collectorB",35853);
  agentE : src | agentE2ESink("collectorC",35853);
  agentF : src | agentE2ESink("collectorC",35853);
  collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");
  collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");
  collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");
DSC0004.jpg

  agentA : src | agentE2EChain("collectorA:35853","collectorB:35853");
  agentB : src | agentE2EChain("collectorA:35853","collectorC:35853");
  agentC : src | agentE2EChain("collectorB:35853","collectorA:35853");
  agentD : src | agentE2EChain("collectorB:35853","collectorC:35853");
  agentE : src | agentE2EChain("collectorC:35853","collectorA:35853");
  agentF : src | agentE2EChain("collectorC:35853","collectorB:35853");
  collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");
  collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");
  collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");
  传统数据库与Hadoop间数据同步
  Sqoop:SQL-to-Hadoop,是连接传统关系型数据库和Hadoop 的桥梁。可以把关系型数据库的数据导入到 Hadoop 系统 ( 如 HDFS HBase 和 Hive) 中或把数据从 Hadoop 系统里抽取并导出到关系型数据库里/利用MapReduce可以加快数据传输速度实现批处理方式进行数据传输。
  Sqoop优势是高效、可控地利用资源、任务并行度,超时时间等 、数据类型映射与转换 可自动进行或用户也可自定义 。同时,其 支持多种数据库如:MySQL 、Oracle 以及PostgreSQL 。
DSC0005.jpg

  通过命令sqoop可以生成map task将传统数据库中的数据传输到HDFS、Hbase或Hive上。
  Sqoop import 将数据从关系型数据库导入Hadoop中
   DSC0006.jpg
  
  步骤1:Sqoop与数据库Server通信,获取数据库表的元数据 信息; 步骤2:Sqoop启动一个MapOnly的MR作业,利用元数据信 息并行将数据写入Hadoop。
  Sqoop import使用:
  sqoop import \     
  --connect jdbc:mysql://mysql.example.com/sqoop \   
  --username sqoop \     
  --password sqoop \     
  --table cities
  --connnect: 指定JDBC URL
  --username/password:mysql数据库的用户名
  --table:要读取的数据库表
  bin/hadoop fs -cat cities/part-m-*

  1,USA,Palo>  2,Czech Republic,Brno
  3,USA,Sunnyvale
  导入到指定目录:
  sqoop import \     
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \   
  --password sqoop \     
  --table cities \     
  --target-dir /etl/input/cities
  导入字段country为USA的值:
  sqoop import \   
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \     
  --password sqoop \     
  --table cities \     
  --where "country = 'USA'"
  生成顺序文件:
  sqoop import \     
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \     
  --password sqoop \     
  --table cities \     
  --as-sequencefile
  指定map个数:
  sqoop import \     
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \     
  --password sqoop \     
  --table cities \     
  --num-mappers 10
  Sqoop import—导入多个表 :
  sqoop import \   
  --connect jdbc:mysql://mysql.example.com/sqoop \  
  --username sqoop \   
  --password sqoop \   
  --query 'SELECT normcities.id, \                  
  countries.country, \                  
  normcities.city \                  
  FROM normcities \                  
  JOIN countries USING(country_id) \                  
  WHERE $CONDITIONS' \

  --split-by>  --target-dir cities
  Sqoop import增量导入(一):
  适用于数据每次被追加到数据库中,而已有数据不变的情况且仅导入id这一列值大于1的记录。
  sqoop import \     
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \   
  --password sqoop \     
  --table visits \     
  --incremental append \     

  --check-column>  --last-value 1
  Sqoop import增量导入(二)
  每次成功运行后,sqoop将最后一条记录的id值保存到 metastore中,供下次使用。
  sqoop job \   
  --create visits \   
  --import \   
  --connect jdbc:mysql://mysql.example.com/sqoop \   
  --username sqoop \   
  --password sqoop \   
  --table visits \   
  --incremental append \   

  --check-column>  --last-value 0
  运行sqoop作业:sqoop job --exec visits
  Sqoop import增量导入(三)
  数据库中有一列last_update_date,记录了上次修改时间。 Sqoop仅将某时刻后的数据导入Hadoop。
  sqoop import \   
  --connect jdbc:mysql://mysql.example.com/sqoop \   
  --username sqoop \   
  --password sqoop \   
  --table visits \   
  --incremental lastmodified \   
  --check-column last_update_date \   
  --last-value “2013-05-22 01:01:01”
  Sqoop Export 将数据从Hadoop导入关系型数据库导中
   DSC0007.jpg
  步骤1:Sqoop与数据库Server通信,获取数据库表的元数据 信息; 步骤2:并行导入数据:将Hadoop上文件划分成若 干个split; 每个split由一个Map Task进 行数据导入。
  Sqoop Export使用方法
  sqoop export \     
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \     
  --password sqoop \     
  --table cities \     
  --export-dir cities
  --connnect: 指定JDBC URL
  --username/password:mysql数据库的用户名
  --table:要导入的数据库表
  export-dir:数据在HDFS上存放目录
  Sqoop Export—保证原子性:
  sqoop export \   
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \     
  --password sqoop \     
  --table cities \     
  --staging-table staging_cities
  Sqoop Export—更新已有数据:
  sqoop export \     
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \     
  --password sqoop \     
  --table cities \     

  --update-key>  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \     
  --password sqoop \     
  --table cities \     

  --update-key>  --update-mode allowinsert
  Sqoop Export—选择性插入 :
  sqoop export \     
  --connect jdbc:mysql://mysql.example.com/sqoop \     
  --username sqoop \   
  --password sqoop \     
  --table cities \     
  --columns country,city
  Sqoop与其他系统结合
     Sqoop可以与Oozie、Hive、Hbase等系统结合;  用户需要在sqoop-env.sh中增加HBASE_HOME、 HIVE_HOME等环境变量。 Sqoop与Hive结合:  sqoop import \     
--connect jdbc:mysql://mysql.example.com/sqoop \     --username sqoop \     --password sqoop \     --table cities \     --hive-import  Sqoop与HBase结合:
  sqoop import \     
--connect jdbc:mysql://mysql.example.com/sqoop \     --username sqoop \     --password sqoop \     --table cities \     --hbase-table cities \     --column-family world       以上介绍了数据收集系统以及数据传输工具sqoop,后续将讲解Hive及Pig相关主题。来源: http://lib.csdn.net/article/hadoop/62796

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425205-1-1.html 上篇帖子: 使用 Hadoop 进行语料处理(面试题) 下篇帖子: hadoop归档、压缩、串行化系统,序列文件
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表