设为首页 收藏本站
查看: 1829|回复: 0

[经验分享] Flume Spool Source 源码过程分析(未运行)

[复制链接]

尚未签到

发表于 2015-9-17 07:15:58 | 显示全部楼层 |阅读模式
  主要涉及到的类:
  


SpoolDirectorySource 读取用户配置,并按照batchSize去读取这么多量的Event从用户指定的Spooling Dir中。SpoolDirectorySource 不会去读取某一个具体的文件,而是通过内部的reader去读取。文件切换等操作,都是reader去实现

内部类:SpoolDirectoryRunnable是一个线程,其中的run方法,完成从Spooling Dir读取Event(使用reader去读取)






1 @Override
2     public void run() {
3       int backoffInterval = 250;
4       try {
5         while (!Thread.interrupted()) {
6           List<Event> events = reader.readEvents(batchSize);
7           if (events.isEmpty()) {
8             break;
9           }
10           sourceCounter.addToEventReceivedCount(events.size());
11           sourceCounter.incrementAppendBatchReceivedCount();
12
13           try {
14             getChannelProcessor().processEventBatch(events);
15             reader.commit();
16           } catch (ChannelException ex) {
17             logger.warn("The channel is full, and cannot write data now. The " +
18               "source will try again after " + String.valueOf(backoffInterval) +
19               " milliseconds");
20             hitChannelException = true;
21             if (backoff) {
22               TimeUnit.MILLISECONDS.sleep(backoffInterval);
23               backoffInterval = backoffInterval << 1;
24               backoffInterval = backoffInterval >= maxBackoff ? maxBackoff :
25                                 backoffInterval;
26             }
27             continue;
28           }
29           backoffInterval = 250;
30           sourceCounter.addToEventAcceptedCount(events.size());
31           sourceCounter.incrementAppendBatchAcceptedCount();
32         }
33         logger.info("Spooling Directory Source runner has shutdown.");
34       } catch (Throwable t) {
35         logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
36             "Uncaught exception in SpoolDirectorySource thread. " +
37             "Restart or reconfigure Flume to continue processing.", t);
38         hasFatalError = true;
39         Throwables.propagate(t);
40       }
41     }
  




ReliableSpoolingFileEventReader 定义在SpoolDirectorySource中的reader。看这个名字就知道碉堡了,reliable的,怎么实现reliable的??

reader的readEvent方法,会根据batchSize大小读取指定的Event



该方法的大致意思:

如果没有提交,如果当前文件空,错,否则获取EventDeserializer

如果已经提交,如果当前文件空,则获得下一个文件,之后,如果文件还是空,则返回空Event列表。



之后,调用EventDeserializer的readEvents。






1  public List<Event> readEvents(int numEvents) throws IOException {
2     if (!committed) {
3       if (!currentFile.isPresent()) {
4         throw new IllegalStateException("File should not roll when " +
5             "commit is outstanding.");
6       }
7       logger.info("Last read was never committed - resetting mark position.");
8       currentFile.get().getDeserializer().reset();
9     } else {
10       // Check if new files have arrived since last call
11       if (!currentFile.isPresent()) {
12         currentFile = getNextFile();
13       }
14       // Return empty list if no new files
15       if (!currentFile.isPresent()) {
16         return Collections.emptyList();
17       }
18     }
19
20     EventDeserializer des = currentFile.get().getDeserializer();
21     List<Event> events = des.readEvents(numEvents);
22
23     /* It's possible that the last read took us just up to a file boundary.
24      * If so, try to roll to the next file, if there is one. */
25     if (events.isEmpty()) {
26       retireCurrentFile();
27       currentFile = getNextFile();
28       if (!currentFile.isPresent()) {
29         return Collections.emptyList();
30       }
31       events = currentFile.get().getDeserializer().readEvents(numEvents);
32     }
33
34     //写入header值,略47
48     committed = false;
49     lastFileRead = currentFile;
50     return events;
51   }
  在这个方法中,我们看到了
  currentFile:该对象采用了谷歌的Optional进行封装,更加容易判断空指针等等。Optional<FileInfo>,该FileInfo封装了普通的File对象和针对该file对象的EventDeserializer(事件序列器)
  该currentFile主要在ReliableSpoolingFileEventReader 类中的Optional<FileInfo> openFile(File file),Optional<FileInfo> getNextFile() 方法中调用。
  
  EventDeserializer:事件序列器的主要作用在于定义一些读取的基本操作
DSC0000.png
  其中mark是读取的行position进行标记
  
  EventDeserializer的实现子类,很多,这里只讲LineDeserializer,顾名思义,按照行去读取,一行就是一个Event
  虽然EventDeserializer已经涉及到读取行了,但是真正读取记录的还不是他。
  
  我们看openfile函数中



1 String nextPath = file.getPath();
2       PositionTracker tracker =
3           DurablePositionTracker.getInstance(metaFile, nextPath);
4       if (!tracker.getTarget().equals(nextPath)) {
5         tracker.close();
6         deleteMetaFile();
7         tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
8       }
15       ResettableInputStream in =
16           new ResettableFileInputStream(file, tracker,
17               ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset,
18               decodeErrorPolicy);
19       EventDeserializer deserializer = EventDeserializerFactory.getInstance
20           (deserializerType, deserializerContext, in);
  
  
  因此可以看出EventDeserializer读取记录是靠 ResettableFileInputStream(in对象),ResettableFileInputStream的初始化需要File类和一个DurablePositionTracker,
  因此,ResettableFileInputStream在读取File内容同时,使用DurablePositionTracker去记录position的信息。
  DurablePositionTracker使用了apache avro来进行持久化
  private final DataFileWriter<TransferStateFileMeta> writer;
  private final DataFileReader<TransferStateFileMeta> reader;
  
  这样,当我们使用EventDeserializer读取一个event的时候,就会从当前文件流中获取信息,同时也能够记录读取的位置信息。
  当读取batchsize数量的event都正确处理后,ReliableSpoolingFileEventReader 会commit(),持久化位置信息



public void commit() throws IOException {
if (!committed && currentFile.isPresent()) {
currentFile.get().getDeserializer().mark();
committed = true;
}
}
  
  这里的mark方法,调用
  LineDeserializer的



@Override
public void mark() throws IOException {
ensureOpen();
in.mark();
}
  
  在调用ResettableFileInputStream(in)的mark方法



@Override
public void mark() throws IOException {
tracker.storePosition(tell());
}
  
  之后调用位置tracker的storePostition方法(DurablePositionTracker)



@Override
public synchronized void storePosition(long position) throws IOException {
metaCache.setOffset(position);
writer.append(metaCache);
writer.sync();
writer.flush();
}
  
  之后,调用avro的DataFileWriter,完成写入操作。
  
  最后,至于postition位置的持久化逻辑判断,基本也能猜到,当出现trash时候,从未读取的地方开始读取,等等,所以说,是ResettableFileInputStream的输入流,因为他能够读取信息,也能持久化读取的信息位置。
  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114566-1-1.html 上篇帖子: Flume学习——BasicTransactionSemantics 下篇帖子: flume 集群datanode节点失败导致hdfs写失败(转)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表