sxyzy 发表于 2015-9-17 07:15:58

Flume Spool Source 源码过程分析(未运行)

  主要涉及到的类:
  


SpoolDirectorySource 读取用户配置,并按照batchSize去读取这么多量的Event从用户指定的Spooling Dir中。SpoolDirectorySource 不会去读取某一个具体的文件,而是通过内部的reader去读取。文件切换等操作,都是reader去实现

内部类:SpoolDirectoryRunnable是一个线程,其中的run方法,完成从Spooling Dir读取Event(使用reader去读取)






1 @Override
2   public void run() {
3       int backoffInterval = 250;
4       try {
5         while (!Thread.interrupted()) {
6         List<Event> events = reader.readEvents(batchSize);
7         if (events.isEmpty()) {
8             break;
9         }
10         sourceCounter.addToEventReceivedCount(events.size());
11         sourceCounter.incrementAppendBatchReceivedCount();
12
13         try {
14             getChannelProcessor().processEventBatch(events);
15             reader.commit();
16         } catch (ChannelException ex) {
17             logger.warn("The channel is full, and cannot write data now. The " +
18               "source will try again after " + String.valueOf(backoffInterval) +
19               " milliseconds");
20             hitChannelException = true;
21             if (backoff) {
22               TimeUnit.MILLISECONDS.sleep(backoffInterval);
23               backoffInterval = backoffInterval << 1;
24               backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
25                                 backoffInterval;
26             }
27             continue;
28         }
29         backoffInterval = 250;
30         sourceCounter.addToEventAcceptedCount(events.size());
31         sourceCounter.incrementAppendBatchAcceptedCount();
32         }
33         logger.info("Spooling Directory Source runner has shutdown.");
34       } catch (Throwable t) {
35         logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
36             "Uncaught exception in SpoolDirectorySource thread. " +
37             "Restart or reconfigure Flume to continue processing.", t);
38         hasFatalError = true;
39         Throwables.propagate(t);
40       }
41   }
  




ReliableSpoolingFileEventReader 定义在SpoolDirectorySource中的reader。看这个名字就知道碉堡了,reliable的,怎么实现reliable的??

reader的readEvent方法,会根据batchSize大小读取指定的Event



该方法的大致意思:

如果没有提交,如果当前文件空,错,否则获取EventDeserializer

如果已经提交,如果当前文件空,则获得下一个文件,之后,如果文件还是空,则返回空Event列表。



之后,调用EventDeserializer的readEvents。






1public List<Event> readEvents(int numEvents) throws IOException {
2   if (!committed) {
3       if (!currentFile.isPresent()) {
4         throw new IllegalStateException("File should not roll when " +
5             "commit is outstanding.");
6       }
7       logger.info("Last read was never committed - resetting mark position.");
8       currentFile.get().getDeserializer().reset();
9   } else {
10       // Check if new files have arrived since last call
11       if (!currentFile.isPresent()) {
12         currentFile = getNextFile();
13       }
14       // Return empty list if no new files
15       if (!currentFile.isPresent()) {
16         return Collections.emptyList();
17       }
18   }
19
20   EventDeserializer des = currentFile.get().getDeserializer();
21   List<Event> events = des.readEvents(numEvents);
22
23   /* It's possible that the last read took us just up to a file boundary.
24      * If so, try to roll to the next file, if there is one. */
25   if (events.isEmpty()) {
26       retireCurrentFile();
27       currentFile = getNextFile();
28       if (!currentFile.isPresent()) {
29         return Collections.emptyList();
30       }
31       events = currentFile.get().getDeserializer().readEvents(numEvents);
32   }
33
34   //写入header值,略47
48   committed = false;
49   lastFileRead = currentFile;
50   return events;
51   }
  在这个方法中,我们看到了
  currentFile:该对象采用了谷歌的Optional进行封装,更加容易判断空指针等等。Optional<FileInfo>,该FileInfo封装了普通的File对象和针对该file对象的EventDeserializer(事件序列器)
  该currentFile主要在ReliableSpoolingFileEventReader 类中的Optional<FileInfo> openFile(File file),Optional<FileInfo> getNextFile() 方法中调用。
  
  EventDeserializer:事件序列器的主要作用在于定义一些读取的基本操作

  其中mark是读取的行position进行标记
  
  EventDeserializer的实现子类,很多,这里只讲LineDeserializer,顾名思义,按照行去读取,一行就是一个Event
  虽然EventDeserializer已经涉及到读取行了,但是真正读取记录的还不是他。
  
  我们看openfile函数中



1 String nextPath = file.getPath();
2       PositionTracker tracker =
3         DurablePositionTracker.getInstance(metaFile, nextPath);
4       if (!tracker.getTarget().equals(nextPath)) {
5         tracker.close();
6         deleteMetaFile();
7         tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
8       }
15       ResettableInputStream in =
16         new ResettableFileInputStream(file, tracker,
17               ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
18               decodeErrorPolicy);
19       EventDeserializer deserializer = EventDeserializerFactory.getInstance
20         (deserializerType, deserializerContext, in);
  
  
  因此可以看出EventDeserializer读取记录是靠 ResettableFileInputStream(in对象),ResettableFileInputStream的初始化需要File类和一个DurablePositionTracker,
  因此,ResettableFileInputStream在读取File内容同时,使用DurablePositionTracker去记录position的信息。
  DurablePositionTracker使用了apache avro来进行持久化
  private final DataFileWriter<TransferStateFileMeta> writer;
private final DataFileReader<TransferStateFileMeta> reader;
  
  这样,当我们使用EventDeserializer读取一个event的时候,就会从当前文件流中获取信息,同时也能够记录读取的位置信息。
  当读取batchsize数量的event都正确处理后,ReliableSpoolingFileEventReader 会commit(),持久化位置信息



public void commit() throws IOException {
if (!committed && currentFile.isPresent()) {
currentFile.get().getDeserializer().mark();
committed = true;
}
}
  
  这里的mark方法,调用
  LineDeserializer的



@Override
public void mark() throws IOException {
ensureOpen();
in.mark();
}
  
  在调用ResettableFileInputStream(in)的mark方法



@Override
public void mark() throws IOException {
tracker.storePosition(tell());
}
  
  之后调用位置tracker的storePostition方法(DurablePositionTracker)



@Override
public synchronized void storePosition(long position) throws IOException {
metaCache.setOffset(position);
writer.append(metaCache);
writer.sync();
writer.flush();
}
  
  之后,调用avro的DataFileWriter,完成写入操作。
  
  最后,至于postition位置的持久化逻辑判断,基本也能猜到,当出现trash时候,从未读取的地方开始读取,等等,所以说,是ResettableFileInputStream的输入流,因为他能够读取信息,也能持久化读取的信息位置。
  
  
页: [1]
查看完整版本: Flume Spool Source 源码过程分析(未运行)