设为首页 收藏本站
查看: 2021|回复: 0

[经验分享] Hive读取Flume正在写入的HDFS

[复制链接]

尚未签到

发表于 2017-5-22 06:01:36 | 显示全部楼层 |阅读模式
  Hive的表创建为外部分区表,例如:
  USE mydb;
  CREATE EXTERNAL TABLE mytable
  ( 
  c1 String,
  c2 INT,
  c3 INT,
  create_time String
  )
  PARTITIONED BY (dt STRING)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY '|||';
  然后创建分区,如:
  ALTER TABLE mytable ADD PARTITION (dt = '2013-09-25') LOCATION '/data/mytable/2013-09-25/';
  ALTER TABLE mytable ADD PARTITION (dt = '2013-09-26') LOCATION '/data/mytable/2013-09-26/';
  ALTER TABLE mytable ADD PARTITION (dt = '2013-09-27') LOCATION '/data/mytable/2013-09-27/';
  即Hive的表按天进行分区。指定到相应目录。
  删除分区

alter table mytable drop partition (dt='2012-03-06')  注意hdfs的起始路径,从hdfs的根目录开始,不然会加载不到数据。
  分区可以写成脚本每天自动执行。处理昨天的数据

yesterday=$(date -d '-1 day' '+%Y-%m-%d')
$HIVE_HOME/bin/hive -e "use mydb;ALTER TABLE mytable ADD PARTITION (dt = '$yesterday') LOCATION '/user/hive/warehouse/tail/$yesterday/';"

  而Flume中配置将数据保存到HDFS中,即HDFS sink。计划每天一个文件,进行日切。如2013-09-25对应的文件就保存在:
  hdfs://<hive.metastore.warehouse.dir>/data/mytable/2013-09-25/FlumeData.xxx
  这样,只要文件生成,就能直接通过操作Hive的mytable表来对文件进行统计了。
  业务上要求统计工作是按照小时进行,考虑到按照小时进行分区过于细化,而且会导致过多的文件给NameNode造成内存压力,所以如上Hive层面按天进行划分。
  统计执行时首先指定天分区,然后根据create_time(mm:hh:ss)指定统计时间段,如:
  SELECT c1,
  SUM(c2),
  SUM(c3)
  FROM mytable
  WHERE dt = ’2013-09-25′
  AND create_time BETWEEN ’22:00:00′ AND ’22:59:59′
  GROUP BY c1;
  在实践的过程中遇到如下两个问题:
  1.对于正在写入的文件,通过hadoop fs -ls 命令查看,其大小始终是0,即使通过hadoop fs -cat可以看到实际已经有内容存在!通过hive处理的话也看不到其中的数据。
  2.Flume正在写入的文件,默认会有.tmp后缀。如果Hive在执行过程中,Flume切换文件,即将xxx.tmp重命名为xxx,这时Hive会报错如file not found xxx.tmp。
  针对问题1
  首先了解HDFS的特点:
  HDFS中所有文件都是由块BLOCK组成,默认块大小为64MB。在我们的测试中由于数据量小,始终在写入文件的第一个BLOCK。而HDFS与一般的POSIX要求的文件系统不太一样,其文件数据的可见性是这样的:
  如果创建了文件,这个文件可以立即可见;
  写入文件的数据则不被保证可见了,哪怕是执行了刷新操作(flush/sync)。只有数据量大于1个BLOCK时,第一个BLOCK的数据才会被看到,后续的BLOCK也同样的特性。正在写入的BLOCK始终不会被其他用户看到!
  HDFS中的sync()保证数据持久化到了datanode上,然后可以被其他用户看到。
  针对HDFS的特点,可以解释问题1中的现象,正在写入无法查看。但是使用Hive统计时Flume还在写入那个BLOCK(数据量小的时候),那岂不是统计不到信息?
  解决方案:
  每天再按小时切分文件——这样虽然每天文件较多,但是能够保证统计时数据可见!Flume上的配置项为hdfs.rollInterval。
  如果文件数多,那么还可以考虑对以前的每天的小时文件合并为每天一个文件!
  针对问题2
  原因比较明显,Hive处理前获取了对应分区下的所有文件信息,其中包含xxx.tmp文件,而传递给MapReduce处理时,由于Flume进行了切换,导致原来的xxx.tmp变成了xxx,新的.tmp名称又变成了yyy.tmp,这样自然找不到xxx.tmp了。
  解决方案:
  解决这个问题想法之一是想控制Hive的处理时机,但是显然不是那么好控制。
  进一步了解到HDFS的Java API读取HDFS文件时,会忽略以”.”和”_”开头的文件!类似于Linux中默认.xx是隐藏的一样,应用程序读取HDFS文件时默认也不读取.xxx和_xxx这样名称的文件!
  这样就产生了针对问题2的处理方案一)配置Flume,针对正在写入的文件,以.号开头。涉及Flume配置项hdfs.inUsePrefix。

# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'a
#agent section  
producer.sources = s
producer.channels = c
producer.sinks = r
#producer.sources.s.type = seq
producer.sources.s.channels = c
producer.sources.s.type = exec
producer.sources.s.command=tail -n 0 -F /usr/local/nginx/nginxlog/access.log
producer.sources.s.deletePolicy=never
#producer.sources.s.type = avro
#producer.sources.s.bind = localhost
#producer.sources.s.port = 10000
# Each sink's type must be defined(给谁了)
#producer.sinks.r.type = avro
#producer.sinks.r.hostname = 10.1.1.100
#producer.sinks.r.port = 20000
producer.sources.source1.interceptors = i1
producer.sources.source1.interceptors.i1.type = timestamp
producer.sinks.r.type = hdfs
producer.sinks.r.hdfs.path = hdfs://localhost:8010/user/hive/warehouse/tail/%Y-%m-%d
producer.sinks.r.hdfs.inUsePrefix = .
producer.sinks.r.hdfs.maxOpenFiles = 5000
producer.sinks.r.hdfs.batchSize= 1000
producer.sinks.r.hdfs.fileType = DataStream
producer.sinks.r.hdfs.writeFormat =Text
producer.sinks.r.hdfs.rollSize = 128000000
producer.sinks.r.hdfs.rollCount = 0
producer.sinks.r.hdfs.rollInterval = 3600
producer.sinks.r.hdfs.useLocalTimeStamp = true
producer.sinks.r.request.required.acks=1
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
#Specify the channel the sink should use
producer.sinks.r.channel = c
# Each channel's type is defined.
producer.channels.c.type = memory  
producer.channels.c.capacity = 1000000
producer.channels.c.transactionCapacity = 1000
#producer.channels.c.type=file
#producer.channels.c.checkpointDir=/usr/local/flumeng/checkpointdir/tcpdir/example_agent
#producer.channels.c.dataDirs=/usr/local/flumeng/datadirs/tddirs/example_agen

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379670-1-1.html 上篇帖子: 使用Flume NG构建数据收集系统(第一部分 Flume介绍) 转载 下篇帖子: flume知识进阶 三组件支持的各种格式介绍
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表