设为首页 收藏本站
查看: 1131|回复: 0

[经验分享] 【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

[复制链接]

尚未签到

发表于 2015-11-27 18:59:50 | 显示全部楼层 |阅读模式
  

以下是我的自定义kafka sink插件的pom文件,编译成jar包丢到flume的lib下即可使用

<?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?>

<project xmlns=&quot;http://maven.apache.org/POM/4.0.0&quot;
         xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;
         xsi:schemaLocation=&quot;http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd&quot;>
  <modelVersion>4.0.0</modelVersion>
  
  <groupId>flume-sinks</groupId>
  <artifactId>cmcc-kafka-sink</artifactId>
  <name>Flume Kafka Sink</name>
  <version>1.0.0</version>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

  <dependencies>
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-sdk</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-configuration</artifactId>
      <version>1.5.2</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.6.1</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.8.1.1</version>
    </dependency>
  </dependencies>

</project>这里取出了parent,也取出了rat plugin,这样就避免了编译时出现的常见错误https://issues.apache.org/jira/browse/FLUME-1372


  定义了几个变量
  

public static final String BATCH_SIZE = &quot;batchSize&quot;;
public static final int DEFAULT_BATCH_SIZE = 100;
public static final String PARTITION_KEY_NAME = &quot;cmcc.partition.key&quot;;
public static final String ENCODING_KEY_NAME = &quot;cmcc.encoding&quot;;
public static final String DEFAULT_ENCODING = &quot;UTF-8&quot;;
public static final String CUSTOME_TOPIC_KEY_NAME = &quot;cmcc.topic.name&quot;;
public static final String DEFAULT_TOPIC_NAME=&quot;CMCC&quot;;


  
  


  自定义sink实现需要继承AbstractSink和实现接口Configurable,并重写部分方法,如下:
  package org.apache.flume.cmcc.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
public class CmccKafkaSink extends AbstractSink implements Configurable {
private static final Logger log = LoggerFactory
.getLogger(CmccKafkaSink.class);
private Properties parameters;
private Producer<String, String> producer;
// private Context context;
private int batchSize;// 一次事务的event数量,整体提交
private List<KeyedMessage<String, String>> messageList;
private SinkCounter sinkCounter;
@Override
public Status process() {
// TODO Auto-generated method stub
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
Event event = null;
try {
long processedEvent = 0;
transaction = channel.getTransaction();
transaction.begin();// 事务开始
messageList.clear();
for (; processedEvent < batchSize; processedEvent++) {
event = channel.take();// 从channel取出一个事件
if (event == null) {
result = Status.BACKOFF;
break;
}
sinkCounter.incrementEventDrainAttemptCount();
// Map<String, String> headers = event.getHeaders();
String partitionKey = parameters
.getProperty(Constants.PARTITION_KEY_NAME);
String topic = StringUtils.defaultIfEmpty(parameters
.getProperty(Constants.CUSTOME_TOPIC_KEY_NAME),
Constants.DEFAULT_TOPIC_NAME);
String encoding = StringUtils.defaultIfEmpty(
parameters.getProperty(Constants.ENCODING_KEY_NAME),
Constants.DEFAULT_ENCODING);
byte[] eventBody = event.getBody();
String eventData = new String(eventBody, encoding);
KeyedMessage<String, String> data = null;
if (StringUtils.isEmpty(partitionKey)) {
data = new KeyedMessage<String, String>(topic, eventData);
} else {
data = new KeyedMessage<String, String>(topic,
partitionKey, eventData);
}
messageList.add(data);
log.debug(&quot;Add data [&quot; + eventData
+ &quot;] into messageList,position:&quot; + processedEvent);
}
if (processedEvent == 0) {
sinkCounter.incrementBatchEmptyCount();
result = Status.BACKOFF;
} else {
if (processedEvent < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(processedEvent);
producer.send(messageList);
log.debug(&quot;Send MessageList to Kafka: [ message List size = &quot;
+ messageList.size() + &quot;,processedEvent number = &quot;
+ processedEvent + &quot;] &quot;);
}
transaction.commit();// batchSize个事件处理完成,一次事务提交
sinkCounter.addToEventDrainSuccessCount(processedEvent);
result = Status.READY;
} catch (Exception e) {
String errorMsg = &quot;Failed to publish events !&quot;;
log.error(errorMsg, e);
e.printStackTrace();
result = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
log.debug(&quot;transaction rollback success !&quot;);
} catch (Exception ex) {
log.error(errorMsg, ex);
throw Throwables.propagate(ex);
}
}
// throw new EventDeliveryException(errorMsg, e);
} finally {
if (transaction != null) {
transaction.close();
}
}
return result;
}
@Override
public synchronized void start() {
// TODO Auto-generated method stub
log.info(&quot;Starting {}...&quot;, this);
sinkCounter.start();
super.start();
ProducerConfig config = new ProducerConfig(this.parameters);
this.producer = new Producer<String, String>(config);
sinkCounter.incrementConnectionCreatedCount();
}
@Override
public synchronized void stop() {
// TODO Auto-generated method stub
log.debug(&quot;Cmcc Kafka sink {} stopping...&quot;, getName());
sinkCounter.stop();
producer.close();
sinkCounter.incrementConnectionClosedCount();
}
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
ImmutableMap<String, String> props = context.getParameters();
batchSize = context.getInteger(Constants.BATCH_SIZE,
Constants.DEFAULT_BATCH_SIZE);
messageList = new ArrayList<KeyedMessage<String, String>>(batchSize);
parameters = new Properties();
for (String key : props.keySet()) {
String value = props.get(key);
this.parameters.put(key, value);
}
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
}
以上sink同时支持了flume的内部监控






  
  这里为了提高性能,添加了batchSize的概念,也就减少了事务提交的次数
  当然当通道中已经没有event了,这时候就将之前处理的event都提交了
  
  下面看配置
  

a1.sinks.k1.type=org.apache.flume.cmcc.kafka.CmccKafkaSink
a1.sinks.k1.metadata.broker.list=192.168.11.174:9092
a1.sinks.k1.partition.key=0
a1.sinks.k1.partitioner.class=org.apache.flume.cmcc.kafka.CmccPartition
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.request.required.acks=0
a1.sinks.k1.max.message.size=1000000
a1.sinks.k1.cmcc.encoding=UTF-8
a1.sinks.k1.cmcc.topic.name=CMCC
a1.sinks.k1.producer.type=sync
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.batchSize=100这里我们看到,有些属性,我们在Constants中并没有定义,这是如何读取的呢,我们来看下kafka的源码就知道了:  
  

private ProducerConfig(VerifiableProperties props)
{
this.props = props;
super();
kafka.producer.async.AsyncProducerConfig.class.$init$(this);
SyncProducerConfigShared.class.$init$(this);
brokerList = props.getString(&quot;metadata.broker.list&quot;);
partitionerClass = props.getString(&quot;partitioner.class&quot;, &quot;kafka.producer.DefaultPartitioner&quot;);
producerType = props.getString(&quot;producer.type&quot;, &quot;sync&quot;);
String prop;
compressionCodec = liftedTree1$1(prop = props.getString(&quot;compression.codec&quot;, NoCompressionCodec$.MODULE$.name()));
Object _tmp = null;
compressedTopics = Utils$.MODULE$.parseCsvList(props.getString(&quot;compressed.topics&quot;, null));
messageSendMaxRetries = props.getInt(&quot;message.send.max.retries&quot;, 3);
retryBackoffMs = props.getInt(&quot;retry.backoff.ms&quot;, 100);
topicMetadataRefreshIntervalMs = props.getInt(&quot;topic.metadata.refresh.interval.ms&quot;, 600000);
ProducerConfig$.MODULE$.validate(this);
}
kafka的源码在实例化ProducerConfig的时候会读取配置文件中的kafka配置信息的。  
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144331-1-1.html 上篇帖子: Hadoop+Flume+Kafka+Zookeeper集群环境搭建(一) 下篇帖子: 新版flume+kafka+storm安装部署
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表