Flume Spool Source 源码过程分析(未运行)
主要涉及到的类:SpoolDirectorySource 读取用户配置,并按照batchSize去读取这么多量的Event从用户指定的Spooling Dir中。SpoolDirectorySource 不会去读取某一个具体的文件,而是通过内部的reader去读取。文件切换等操作,都是reader去实现
内部类:SpoolDirectoryRunnable是一个线程,其中的run方法,完成从Spooling Dir读取Event(使用reader去读取)
1 @Override
2 public void run() {
3 int backoffInterval = 250;
4 try {
5 while (!Thread.interrupted()) {
6 List<Event> events = reader.readEvents(batchSize);
7 if (events.isEmpty()) {
8 break;
9 }
10 sourceCounter.addToEventReceivedCount(events.size());
11 sourceCounter.incrementAppendBatchReceivedCount();
12
13 try {
14 getChannelProcessor().processEventBatch(events);
15 reader.commit();
16 } catch (ChannelException ex) {
17 logger.warn("The channel is full, and cannot write data now. The " +
18 "source will try again after " + String.valueOf(backoffInterval) +
19 " milliseconds");
20 hitChannelException = true;
21 if (backoff) {
22 TimeUnit.MILLISECONDS.sleep(backoffInterval);
23 backoffInterval = backoffInterval << 1;
24 backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
25 backoffInterval;
26 }
27 continue;
28 }
29 backoffInterval = 250;
30 sourceCounter.addToEventAcceptedCount(events.size());
31 sourceCounter.incrementAppendBatchAcceptedCount();
32 }
33 logger.info("Spooling Directory Source runner has shutdown.");
34 } catch (Throwable t) {
35 logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
36 "Uncaught exception in SpoolDirectorySource thread. " +
37 "Restart or reconfigure Flume to continue processing.", t);
38 hasFatalError = true;
39 Throwables.propagate(t);
40 }
41 }
ReliableSpoolingFileEventReader 定义在SpoolDirectorySource中的reader。看这个名字就知道碉堡了,reliable的,怎么实现reliable的??
reader的readEvent方法,会根据batchSize大小读取指定的Event
该方法的大致意思:
如果没有提交,如果当前文件空,错,否则获取EventDeserializer
如果已经提交,如果当前文件空,则获得下一个文件,之后,如果文件还是空,则返回空Event列表。
之后,调用EventDeserializer的readEvents。
1public List<Event> readEvents(int numEvents) throws IOException {
2 if (!committed) {
3 if (!currentFile.isPresent()) {
4 throw new IllegalStateException("File should not roll when " +
5 "commit is outstanding.");
6 }
7 logger.info("Last read was never committed - resetting mark position.");
8 currentFile.get().getDeserializer().reset();
9 } else {
10 // Check if new files have arrived since last call
11 if (!currentFile.isPresent()) {
12 currentFile = getNextFile();
13 }
14 // Return empty list if no new files
15 if (!currentFile.isPresent()) {
16 return Collections.emptyList();
17 }
18 }
19
20 EventDeserializer des = currentFile.get().getDeserializer();
21 List<Event> events = des.readEvents(numEvents);
22
23 /* It's possible that the last read took us just up to a file boundary.
24 * If so, try to roll to the next file, if there is one. */
25 if (events.isEmpty()) {
26 retireCurrentFile();
27 currentFile = getNextFile();
28 if (!currentFile.isPresent()) {
29 return Collections.emptyList();
30 }
31 events = currentFile.get().getDeserializer().readEvents(numEvents);
32 }
33
34 //写入header值,略47
48 committed = false;
49 lastFileRead = currentFile;
50 return events;
51 }
在这个方法中,我们看到了
currentFile:该对象采用了谷歌的Optional进行封装,更加容易判断空指针等等。Optional<FileInfo>,该FileInfo封装了普通的File对象和针对该file对象的EventDeserializer(事件序列器)
该currentFile主要在ReliableSpoolingFileEventReader 类中的Optional<FileInfo> openFile(File file),Optional<FileInfo> getNextFile() 方法中调用。
EventDeserializer:事件序列器的主要作用在于定义一些读取的基本操作
其中mark是读取的行position进行标记
EventDeserializer的实现子类,很多,这里只讲LineDeserializer,顾名思义,按照行去读取,一行就是一个Event
虽然EventDeserializer已经涉及到读取行了,但是真正读取记录的还不是他。
我们看openfile函数中
1 String nextPath = file.getPath();
2 PositionTracker tracker =
3 DurablePositionTracker.getInstance(metaFile, nextPath);
4 if (!tracker.getTarget().equals(nextPath)) {
5 tracker.close();
6 deleteMetaFile();
7 tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
8 }
15 ResettableInputStream in =
16 new ResettableFileInputStream(file, tracker,
17 ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
18 decodeErrorPolicy);
19 EventDeserializer deserializer = EventDeserializerFactory.getInstance
20 (deserializerType, deserializerContext, in);
因此可以看出EventDeserializer读取记录是靠 ResettableFileInputStream(in对象),ResettableFileInputStream的初始化需要File类和一个DurablePositionTracker,
因此,ResettableFileInputStream在读取File内容同时,使用DurablePositionTracker去记录position的信息。
DurablePositionTracker使用了apache avro来进行持久化
private final DataFileWriter<TransferStateFileMeta> writer;
private final DataFileReader<TransferStateFileMeta> reader;
这样,当我们使用EventDeserializer读取一个event的时候,就会从当前文件流中获取信息,同时也能够记录读取的位置信息。
当读取batchsize数量的event都正确处理后,ReliableSpoolingFileEventReader 会commit(),持久化位置信息
public void commit() throws IOException {
if (!committed && currentFile.isPresent()) {
currentFile.get().getDeserializer().mark();
committed = true;
}
}
这里的mark方法,调用
LineDeserializer的
@Override
public void mark() throws IOException {
ensureOpen();
in.mark();
}
在调用ResettableFileInputStream(in)的mark方法
@Override
public void mark() throws IOException {
tracker.storePosition(tell());
}
之后调用位置tracker的storePostition方法(DurablePositionTracker)
@Override
public synchronized void storePosition(long position) throws IOException {
metaCache.setOffset(position);
writer.append(metaCache);
writer.sync();
writer.flush();
}
之后,调用avro的DataFileWriter,完成写入操作。
最后,至于postition位置的持久化逻辑判断,基本也能猜到,当出现trash时候,从未读取的地方开始读取,等等,所以说,是ResettableFileInputStream的输入流,因为他能够读取信息,也能持久化读取的信息位置。
页:
[1]