在flume中 原有的sink的hdfs接口中只有根据写入多少秒存储关闭文档或者写入多少量存储关闭文档,饼不能根据实际需求来关闭文档,所以需要自己改写代码来满足我们的要求.例如 我们需要他在每天到了0点以后关闭上一日的文档,并在写一个新的文档出来,
首先我找到源码
在flume的源码中一个单独的项目flume-hdfs-sink是针对hdfs的可以只接在这个项目里进行修改,然后打包好在放到lib包中.
修改的类org.apache.flume.sink.hdfs.HDFSEventSink
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.sink.hdfs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.SystemClock;
import org.apache.flume.Transaction;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.FlumeAuthenticator;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class HDFSEventSink extends AbstractSink implements Configurable {
public interface WriterCallback {
public void run(String filePath);
}
private static final Logger LOG = LoggerFactory
.getLogger(HDFSEventSink.class);
private static String DIRECTORY_DELIMITER = System.getProperty("file.separator");
private static final long defaultRollInterval = 30;
private static final long defaultRollSize = 1024;
private static final long defaultRollCount = 10;
private static final String defaultFileName = "FlumeData";
private static final String defaultSuffix = "";
private static final String defaultInUsePrefix = "";
private static final String defaultInUseSuffix = ".tmp";
private static final long defaultBatchSize = 100;
private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
private static final int defaultMaxOpenFiles = 5000;
// Time between close retries, in seconds
private static final long defaultRetryInterval = 180;
// Retry forever.
private static final int defaultTryCount = Integer.MAX_VALUE;
/**
* Default length of time we wait for blocking BucketWriter calls
* before timing out the operation. Intended to prevent server hangs.
*/
private static final long defaultCallTimeout = 10000;
/**
* Default number of threads available for tasks
* such as append/open/close/flush with hdfs.
* These tasks are done in a separate thread in
* the case that they take too long. In which
* case we create a new file and move on.
*/
private static final int defaultThreadPoolSize = 10;
private static final int defaultRollTimerPoolSize = 1;
private final HDFSWriterFactory writerFactory;
private WriterLinkedHashMap sfWriters;
private long rollInterval;
private long rollSize;
private long rollCount;
private long batchSize;
private int threadsPoolSize;
private int rollTimerPoolSize;
private CompressionCodec codeC;
private CompressionType compType;
private String fileType;
private String filePath;
private String fileName;
private String suffix;
private String inUsePrefix;
private String inUseSuffix;
private TimeZone timeZone;
private int maxOpenFiles;
private ExecutorService callTimeoutPool;
private ScheduledExecutorService timedRollerPool;
private boolean needRounding = false;
private int roundUnit = Calendar.SECOND;
private int roundValue = 1;
private boolean useLocalTime = false;
private long callTimeout;
private Context context;
private SinkCounter sinkCounter;
private volatile int idleTimeout;
private Clock clock;
private FileSystem mockFs;
private HDFSWriter mockWriter;
private final Object sfWritersLock = new Object();
private long retryInterval;
private int tryCount;
private PrivilegedExecutor privExecutor;
/**
* 根据文件前缀名生成文件滚动
*/
private boolean rollName = false;
/*
* Extended Java LinkedHashMap for open file handle LRU queue.
* We want to clear the oldest file handle if there are too many open ones.
*/
private static class WriterLinkedHashMap
extends LinkedHashMap {
private final int maxOpenFiles;
public WriterLinkedHashMap(int maxOpenFiles) {
super(16, 0.75f, true); // stock initial capacity/load, access ordering
this.maxOpenFiles = maxOpenFiles;
}
@Override
protected boolean removeEldestEntry(Entry eldest) {
if (size() > maxOpenFiles) {
// If we have more that max open files, then close the last one and
// return true
try {
eldest.getValue().close();
} catch (IOException e) {
LOG.warn(eldest.getKey().toString(), e);
} catch (InterruptedException e) {
LOG.warn(eldest.getKey().toString(), e);
Thread.currentThread().interrupt();
}
return true;
} else {
return false;
}
}
}
public HDFSEventSink() {
this(new HDFSWriterFactory());
}
public HDFSEventSink(HDFSWriterFactory writerFactory) {
this.writerFactory = writerFactory;
}
@VisibleForTesting
Map getSfWriters() {
return sfWriters;
}
// read configuration and setup thresholds
@Override
public void configure(Context context) {
this.context = context;
filePath = Preconditions.checkNotNull(
context.getString("hdfs.path"), "hdfs.path is required");
fileName = context.getString("hdfs.filePrefix", defaultFileName);
this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
String tzName = context.getString("hdfs.timeZone");
timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
rollName = context.getBoolean("hdfs.rollName", false);
batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
String codecName = context.getString("hdfs.codeC");
fileType = context.getString("hdfs.fileType", defaultFileType);
maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
defaultThreadPoolSize);
rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
defaultRollTimerPoolSize);
String kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal");
String kerbKeytab = context.getString("hdfs.kerberosKeytab");
String proxyUser = context.getString("hdfs.proxyUser");
tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
if(tryCount 0 && roundValue 0 && roundValue