设为首页 收藏本站
查看: 1412|回复: 0

[经验分享] flume 自定义sink

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-9-17 07:42:29 | 显示全部楼层 |阅读模式
  http://flume.apache.org/FlumeDeveloperGuide.html#sink

  看了 还是比较好上手的,简单翻译一下

  sink的作用是从 Channel 提取 Event 然后传给流中的下一个 Flume Agent或者把它们存储在外部的仓库中。在Flume的配置文件中,一个 Sink 和一个唯一的 Channel 关联。有一个 SinkRunner 实例与每一个配好的 Sink 关联,当 Flume 框架调用 SinkRunner 的 start() 方法时,就创建一个新的线程来驱动这个 Sink (使用  SinkRunner 的实现Runnable接口的 PollingRunner 内部静态类来运行)。这个线程管理了 Sink 的生命周期。 Sink 需要实现 start() 和 stop() 方法。Sink 的 start() 方法需要初始化 Sink 并使它能够达到向目的地发送 Event 的状态。 Sink 的 process() 方法是处理从 Channel 传过来的 Event 和 发送 Event 的核心方法。 Sink 的 Stop() 方法需要做必要的清理工作(比如释放某些资源)。 Sink 也需要实现 Configurable 接口来处理自己的一些配置。

  官网也给出了模板类:



1 public class MySink extends AbstractSink implements Configurable {
2     private String myProp;
3
4     @Override
5     public void configure(Context context) {
6         String myProp = context.getString("myProp", "defaultValue");
7
8         // Process the myProp value (e.g. validation)
9
10         // Store myProp for later retrieval by process() method
11         this.myProp = myProp;
12     }
13
14     @Override
15     public void start() {
16         // Initialize the connection to the external repository (e.g. HDFS) that
17         // this Sink will forward Events to ..
18     }
19
20     @Override
21     public void stop() {
22         // Disconnect from the external respository and do any
23         // additional cleanup (e.g. releasing resources or nulling-out
24         // field values) ..
25     }
26
27     @Override
28     public Status process() throws EventDeliveryException {
29         Status status = null;
30
31         // Start transaction
32         Channel ch = getChannel();
33         Transaction txn = ch.getTransaction();
34         txn.begin();
35
36         try {
37             // This try clause includes whatever Channel operations you want to do
38             Event event = ch.take();
39
40             // Send the Event to the external repository.
41             // storeSomeData(e);
42             txn.commit();
43             status = Status.READY;
44         } catch (Throwable t) {
45             txn.rollback();
46
47             // Log exception, handle individual exceptions as needed
48             status = Status.BACKOFF;
49
50             // re-throw all Errors
51             if (t instanceof Error) {
52                 throw (Error) t;
53             }
54         } finally {
55             txn.close();
56         }
57
58         return status;
59     }
60 }
  

  拿来模板直接填充自己的逻辑代码即可,详细就可以直接参考HDFSSink或者HBaseSink等

  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114588-1-1.html 上篇帖子: Flume Source 下篇帖子: flume-ng 自定义sink消费flume source
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表