设为首页 收藏本站
查看: 2217|回复: 0

[经验分享] [bigdata] 使用Flume hdfs sink, hdfs文件未关闭的问题

[复制链接]

尚未签到

发表于 2015-9-17 08:58:13 | 显示全部楼层 |阅读模式
  现象: 执行mapreduce任务时失败
  通过hadoop fsck -openforwrite命令查看发现有文件没有关闭。
  [iyunv@com ~]# hadoop fsck -openforwrite /data/rc/click/mpp/15-08-05/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.
  Connecting to namenode via http://com.hunantv.hadoopnamenode:50070
FSCK started by root (auth:SIMPLE) from /10.100.1.46 for path /data/rc/click/mpp/15-08-05/ at Thu Aug 06 14:05:03 CST 2015
....................................................................................................
....................................................................................................
........./data/rc/click/mpp/15-08-05/FlumeData.1438758322864 42888 bytes, 1 block(s), OPENFORWRITE:
/data/rc/click/mpp/15-08-05/FlumeData.1438758322864:  Under replicated BP-1672356070-10.100.1.36-1412072991411:blk_1120646538_47162789{blockUCState=UNDER_CONSTRUCTION, primaryNodeIndex=-1, replicas=[ReplicaUnderConstruction[[DISK]DS-f4fff5f3-f3fd-4054-a75c-1d7da53a73af:NORMAL|FINALIZED], ReplicaUnderConstruction[[DISK]DS-26f54bc5-5026-4e6a-94ec-8435224e4aa9:NORMAL|RWR], ReplicaUnderConstruction[[DISK]DS-4ab3fffc-6468-47df-8023-79f23a330371:NORMAL|FINALIZED]]}. Target Replicas is 3 but found 2 replica(s).
..........................................................................................
............................Status: HEALTHY
Total size:99186583 B
Total dirs:1
Total files:328
Total symlinks:0
Total blocks (validated):328 (avg. block size 302398 B)
Minimally replicated blocks:328 (100.0 %)
Over-replicated blocks:0 (0.0 %)
Under-replicated blocks:1 (0.30487806 %)
Mis-replicated blocks:0 (0.0 %)
Default replication factor:3
Average block replication:2.996951
Corrupt blocks:0
Missing replicas:1 (0.101626016 %)
Number of data-nodes:59
Number of racks:6
FSCK ended at Thu Aug 06 14:05:03 CST 2015 in 36 milliseconds
  
The filesystem under path '/data/rc/click/mpp/15-08-05/' is HEALTHY
  
  查看FLume日志
  
  [iyunv@10.100.1.117] out: 05 Aug 2015 11:15:19,322 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.hdfs.BucketWriter.open:234) - Creating hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp
[iyunv@10.100.1.117] out: 05 Aug 2015 11:16:20,493 INFO  [hdfs-sin_hdfs_201-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter$5.call:429)  - Closing idle bucketWriter hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp at 1438744580493
[iyunv@10.100.1.117] out: 05 Aug 2015 11:16:20,497 INFO  [hdfs-sin_hdfs_201-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:363)  - Closing hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp
[iyunv@10.100.1.117] out: 05 Aug 2015 11:16:30,501 WARN  [hdfs-sin_hdfs_201-roll-timer-0] (org.apache.flume.sink.hdfs.BucketWriter.close:370)  - failed to close() HDFSWriter for file (hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp). Exception follows.
[iyunv@10.100.1.117] out: java.io.IOException: Callable timed out after 10000 ms on file: hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp
[iyunv@10.100.1.117] out: 05 Aug 2015 11:16:30,503 INFO  [hdfs-sin_hdfs_201-call-runner-7] (org.apache.flume.sink.hdfs.BucketWriter$8.call:629)  - Renaming hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293.tmp to hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-08-05/FlumeData.1438744519293
  关闭hdfs文件操作因为超时失败,
  查看源码:



public synchronized void close(boolean callCloseCallback)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
try {
flush();
} catch (IOException e) {
LOG.warn("pre-close flush failed", e);
}
boolean failedToClose = false;
LOG.info("Closing {}", bucketPath);
CallRunner<Void> closeCallRunner = createCloseCallRunner();
if (isOpen) {
try {
callWithTimeout(closeCallRunner);
sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
LOG.warn(
"failed to close() HDFSWriter for file (" + bucketPath +
"). Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
failedToClose = true;
}
isOpen = false;
} else {
LOG.info("HDFSWriter is already closed: {}", bucketPath);
}
// NOTE: timed rolls go through this codepath as well as other roll types
if (timedRollFuture != null && !timedRollFuture.isDone()) {
timedRollFuture.cancel(false); // do not cancel myself if running!
timedRollFuture = null;
}
if (idleFuture != null && !idleFuture.isDone()) {
idleFuture.cancel(false); // do not cancel myself if running!
idleFuture = null;
}
if (bucketPath != null && fileSystem != null) {
// could block or throw IOException
try {
renameBucket(bucketPath, targetPath, fileSystem);
} catch(Exception e) {
LOG.warn(
"failed to rename() file (" + bucketPath +
"). Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
final Callable<Void> scheduledRename =
createScheduledRenameCallable();
timedRollerPool.schedule(scheduledRename, retryInterval,
TimeUnit.SECONDS);
}
}
if (callCloseCallback) {
runCloseAction();
closed = true;
}
}
  
  默认超时为10000ms,失败后没有重试,代码中有 failedToClose变量, 但未用到,可能开发人员忘了处理了。。。
  解决方法:
  1. 配置调用操作超时时间,将其调大一点,如5分钟。Flume hdfs sink配置如下:



agent12.sinks.sin_hdfs_201.type=hdfs
agent12.sinks.sin_hdfs_201.channel=ch_hdfs_201
agent12.sinks.sin_hdfs_201.hdfs.path=hdfs://com.hunantv.hadoopnamenode:8020/data/logs/amobile/vod/15-%{month}-%{day}
agent12.sinks.sin_hdfs_201.hdfs.round=true
agent12.sinks.sin_hdfs_201.hdfs.roundValue=10
agent12.sinks.sin_hdfs_201.hdfs.roundUnit=minute
agent12.sinks.sin_hdfs_201.hdfs.fileType=DataStream
agent12.sinks.sin_hdfs_201.hdfs.writeFormat=Text
agent12.sinks.sin_hdfs_201.hdfs.rollInterval=0
agent12.sinks.sin_hdfs_201.hdfs.rollSize=209715200
agent12.sinks.sin_hdfs_201.hdfs.rollCount=0
agent12.sinks.sin_hdfs_201.hdfs.idleTimeout=300
agent12.sinks.sin_hdfs_201.hdfs.batchSize=100
agent12.sinks.sin_hdfs_201.hdfs.minBlockReplicas=1
agent12.sinks.sin_hdfs_201.hdfs.callTimeout=300000

  
  
  2. 修改源码,增加重试。如下:



public synchronized void close(boolean callCloseCallback)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
try {
flush();
} catch (IOException e) {
LOG.warn("pre-close flush failed", e);
}
boolean failedToClose = false;
LOG.info("Closing {}", bucketPath);
CallRunner<Void> closeCallRunner = createCloseCallRunner();
int tryTime = 1;
while (isOpen && tryTime <= 5) {
try {
callWithTimeout(closeCallRunner);
sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
LOG.warn(
"failed to close() HDFSWriter for file (try times:" + tryTime + "): " + bucketPath +
". Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
failedToClose = true;
}
if (failedToClose) {
isOpen = true;
tryTime++;
Thread.sleep(this.callTimeout);
} else {
isOpen = false;
}
}
//如果isopen失敗
if (isOpen) {
LOG.error("failed to close file: " + bucketPath + " after " + tryTime + " tries.");
} else {
LOG.info("HDFSWriter is already closed: {}", bucketPath);
}
// NOTE: timed rolls go through this codepath as well as other roll types
if (timedRollFuture != null && !timedRollFuture.isDone())
{
timedRollFuture.cancel(false); // do not cancel myself if running!
timedRollFuture = null;
}
if (idleFuture != null && !idleFuture.isDone())
{
idleFuture.cancel(false); // do not cancel myself if running!
idleFuture = null;
}
if (bucketPath != null && fileSystem != null) {
// could block or throw IOException
try {
renameBucket(bucketPath, targetPath, fileSystem);
} catch (Exception e) {
LOG.warn(
"failed to rename() file (" + bucketPath +
"). Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
final Callable<Void> scheduledRename =
createScheduledRenameCallable();
timedRollerPool.schedule(scheduledRename, retryInterval,
TimeUnit.SECONDS);
}
}
if (callCloseCallback)
{
runCloseAction();
closed = true;
}
}

  


  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114695-1-1.html 上篇帖子: Flume环境部署和配置详解及案例大全 下篇帖子: 关于flume中的几个疑惑
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表