设为首页 收藏本站
查看: 1814|回复: 0

[经验分享] 如何编写Flume-ng-morphline-avro-sink

[复制链接]

尚未签到

发表于 2015-11-27 17:12:01 | 显示全部楼层 |阅读模式
  以下内容都是自己瞎琢磨,并实验通过了。不知是否还有其他更好的方法,请各位大侠指正。本人研究大数据不到两个月时间。   
  工作需要,在预研与大数据,主要是hadoop相关组件和子项目的一些技术。

    预研的产品平台主要包含hadoop、flume、solr、spark。目前重点关注flume和solr部分。

    即:从flume采集回日志进行分词传给solr创建索引,或第三方平台发送的已经结构化的数据直接创建索引。
  平台框架类似下图(自己用visio画的简单示意图,仅供参考。图中省略了flume channel,数据处理层与数据分发层中间缺少了一个source和channel):
  





  数据源:部署flume agent和第三方agent。flume agent主要用于采集日志。第三方agent用户采集除日志之外的其他信息。

采集层:部署flume和第三方采集平台,flume source用于接收从flume agent采集回来的日志信息。3rd platform用于接收从3rd agent采集回来的其他数据。3rd platform将数据进行结构化传递给flume source。

数据处理层:判断数据类型(结构化、非结构化),对非结构化数据使用morphline进行分词。采用morphline-avro-sink发出。

数据分发层:接收morphline-avro-sink数据,使用不同的sink将数据分发给不同的业务进行处理,其中重要的一条路径是分发给solr创建数据索引。

业务处理层:只介绍solr创建索引。


  本文重点介绍morphline-avro-sink的编写过程。以及周边所需要的一些功能。

主要有以下几点是需要重点关注的:

1、参照flume源码中的flume-ng-morphline-solr-sink代码。

2、由于该sink最后是需要将数据以avro格式发出,所以MorphlineSink要继承AbstractRpcSink。因为Flume-ng的AvroSink就是继承的这个类。

3、因为AbstractRpcSink对外提供了两个接口用于数据处理:RpcClient.append(Event)和RpcClient.appendBatch(List<Event>)。所以,要在MorphlineSink中做好RpcClient的初始化。

4、MorphlineHandlerImpl中要在上下文中初始化好一个finalChild的Command。这个command默认是morphline中所有命令的最后一个,来接收之前命令的处理结果。

以下是MorphlineSink的代码,其中与flume-ng-morphline-solr-sink不同之处用红色字体标识。


  

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the &quot;License&quot;); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.sink.avro.morphline;
import java.util.Properties;
import java.util.Map.Entry;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractRpcSink;
import org.kitesdk.morphline.api.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Flume sink that extracts search documents from Flume events and processes them using a morphline
* {@link Command} chain.
*/
public class MorphlineSink extends AbstractRpcSink implements Configurable {
private RpcClient client;
private Properties clientProps;
private int maxBatchSize = 1000;
private long maxBatchDurationMillis = 1000;
private String handlerClass;
private MorphlineHandler handler;
private Context context;
private SinkCounter sinkCounter;
public static final String BATCH_SIZE = &quot;batchSize&quot;;
public static final String BATCH_DURATION_MILLIS = &quot;batchDurationMillis&quot;;
public static final String HANDLER_CLASS = &quot;handlerClass&quot;;
private static final Logger LOGGER = LoggerFactory.getLogger(MorphlineSink.class);
public MorphlineSink() {
this(null);
}
/** For testing only */
protected MorphlineSink(MorphlineHandler handler) {
this.handler = handler;
}
@Override
public void configure(Context context) {
this.context = context;
maxBatchSize = context.getInteger(BATCH_SIZE, maxBatchSize);
maxBatchDurationMillis = context.getLong(BATCH_DURATION_MILLIS, maxBatchDurationMillis);
handlerClass = context.getString(HANDLER_CLASS, MorphlineHandlerImpl.class.getName());   
if (sinkCounter == null) {
LOGGER.info(&quot;sinkCount is null&quot;);
sinkCounter = new SinkCounter(getName());
}
/*LOGGER.info(&quot;sinkCount is &quot; + sinkCounter.toString());*/
clientProps = new Properties();
clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, &quot;h1&quot;);
clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
&quot;h1&quot;, context.getString(&quot;hostname&quot;) + &quot;:&quot; + context.getInteger(&quot;port&quot;));
for (Entry<String, String> entry: context.getParameters().entrySet()) {
clientProps.setProperty(entry.getKey(), entry.getValue());
}
<span style=&quot;color:#ff0000;&quot;>client = initializeRpcClient(clientProps);</span>
if (handler == null) {
MorphlineHandler tmpHandler;
try {
tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance();
} catch (Exception e) {
throw new ConfigurationException(e);
}
tmpHandler.configure(context);
handler = tmpHandler;
}
super.configure(context);
}
/**
* Returns the maximum number of events to take per flume transaction;
* override to customize
*/
private int getMaxBatchSize() {
return maxBatchSize;
}
/** Returns the maximum duration per flume transaction; override to customize */
private long getMaxBatchDurationMillis() {
return maxBatchDurationMillis;
}
/*@Override
public synchronized void start() {
LOGGER.info(&quot;Starting Morphline Sink {} ...&quot;, this);
sinkCounter.start();
if (handler == null) {
MorphlineHandler tmpHandler;
try {
tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance();
} catch (Exception e) {
throw new ConfigurationException(e);
}
tmpHandler.configure(context);
handler = tmpHandler;
}   
super.start();
LOGGER.info(&quot;Morphline Sink {} started.&quot;, getName());
}
@Override
public synchronized void stop() {
LOGGER.info(&quot;Morphline Sink {} stopping...&quot;, getName());
try {
if (handler != null) {
handler.stop();
}
sinkCounter.stop();
LOGGER.info(&quot;Morphline Sink {} stopped. Metrics: {}, {}&quot;, getName(), sinkCounter);
} finally {
super.stop();
}
}*/
@Override
public Status process() throws EventDeliveryException {
int batchSize = getMaxBatchSize();
long batchEndTime = System.currentTimeMillis() + getMaxBatchDurationMillis();
Channel myChannel = getChannel();
Transaction txn = myChannel.getTransaction();
txn.begin();
boolean isMorphlineTransactionCommitted = true;
try {
int numEventsTaken = 0;
handler.beginTransaction();
isMorphlineTransactionCommitted = false;
//      List<Event> events = Lists.newLinkedList();
// repeatedly take and process events from the Flume queue
for (int i = 0; i < batchSize; i++) {
Event event = myChannel.take();
if (event == null) {
break;
}
sinkCounter.incrementEventDrainAttemptCount();
numEventsTaken++;
//        LOGGER.info(&quot;Flume event: {}&quot;, event);
//StreamEvent streamEvent = createStreamEvent(event);
<span style=&quot;color:#ff0000;&quot;>handler.process(event, client);</span>
//        events.add(event);
if (System.currentTimeMillis() >= batchEndTime) {
break;
}
}
//      handler.process(events, client);
// update metrics
if (numEventsTaken == 0) {
sinkCounter.incrementBatchEmptyCount();
}
if (numEventsTaken < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
handler.commitTransaction();
isMorphlineTransactionCommitted = true;
txn.commit();
sinkCounter.addToEventDrainSuccessCount(numEventsTaken);
return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
} catch (Throwable t) {
// Ooops - need to rollback and back off
LOGGER.error(&quot;Morphline Sink &quot; + getName() + &quot;: Unable to process event from channel &quot; + myChannel.getName()
+ &quot;. Exception follows.&quot;, t);
try {
if (!isMorphlineTransactionCommitted) {
handler.rollbackTransaction();
}
} catch (Throwable t2) {
LOGGER.error(&quot;Morphline Sink &quot; + getName() + &quot;: Unable to rollback morphline transaction. &quot; +
&quot;Exception follows.&quot;, t2);
} finally {
try {
txn.rollback();
} catch (Throwable t4) {
LOGGER.error(&quot;Morphline Sink &quot; + getName() + &quot;: Unable to rollback Flume transaction. &quot; +
&quot;Exception follows.&quot;, t4);
}
}
if (t instanceof Error) {
throw (Error) t; // rethrow original exception
} else if (t instanceof ChannelException) {
return Status.BACKOFF;
} else {
throw new EventDeliveryException(&quot;Failed to send events&quot;, t); // rethrow and backoff
}
} finally {
txn.close();
}
}
@Override
public String toString() {
int i = getClass().getName().lastIndexOf('.') + 1;
String shortClassName = getClass().getName().substring(i);
return getName() + &quot; (&quot; + shortClassName + &quot;)&quot;;
}
@Override
protected RpcClient initializeRpcClient(Properties props) {
LOGGER.info(&quot;Attempting to create Avro Rpc client.&quot;);
return RpcClientFactory.getInstance(props);
}
}


/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the &quot;License&quot;); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.sink.avro.morphline;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.event.EventBuilder;
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.MorphlineCompilationException;
import org.kitesdk.morphline.api.MorphlineContext;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.Compiler;
import org.kitesdk.morphline.base.FaultTolerance;
import org.kitesdk.morphline.base.Fields;
import org.kitesdk.morphline.base.Metrics;
import org.kitesdk.morphline.base.Notifications;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ListMultimap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
/**
* A {@link MorphlineHandler} that processes it's events using a morphline
* {@link Command} chain.
*/
public class MorphlineHandlerImpl implements MorphlineHandler {
private MorphlineContext morphlineContext;
private Command morphline;
private Command finalChild;
private String morphlineFileAndId;
private Timer mappingTimer;
private Meter numRecords;
private Meter numFailedRecords;
private Meter numExceptionRecords;
public static final String MORPHLINE_FILE_PARAM = &quot;morphlineFile&quot;;
public static final String MORPHLINE_ID_PARAM = &quot;morphlineId&quot;;
/**
* Morphline variables can be passed from flume.conf to the morphline, e.g.:
* agent.sinks.solrSink.morphlineVariable.zkHost=127.0.0.1:2181/solr
*/
public static final String MORPHLINE_VARIABLE_PARAM = &quot;morphlineVariable&quot;;
private static final Logger LOG = LoggerFactory
.getLogger(MorphlineHandlerImpl.class);
// For test injection
void setMorphlineContext(MorphlineContext morphlineContext) {
this.morphlineContext = morphlineContext;
}
// for interceptor
void setFinalChild(Command finalChild) {
this.finalChild = finalChild;
}
@Override
public void configure(Context context) {
String morphlineFile = context.getString(MORPHLINE_FILE_PARAM);
String morphlineId = context.getString(MORPHLINE_ID_PARAM);
if (morphlineFile == null || morphlineFile.trim().length() == 0) {
throw new MorphlineCompilationException(&quot;Missing parameter: &quot;
+ MORPHLINE_FILE_PARAM, null);
}
morphlineFileAndId = morphlineFile + &quot;@&quot; + morphlineId;
if (morphlineContext == null) {
FaultTolerance faultTolerance = new FaultTolerance(
context.getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false),
context.getBoolean(
FaultTolerance.IS_IGNORING_RECOVERABLE_EXCEPTIONS,
false),
context.getString(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES));
morphlineContext = new MorphlineContext.Builder()
.setExceptionHandler(faultTolerance)
.setMetricRegistry(
SharedMetricRegistries
.getOrCreate(morphlineFileAndId)).build();
}
Config override = ConfigFactory.parseMap(context
.getSubProperties(MORPHLINE_VARIABLE_PARAM + &quot;.&quot;));
<span style=&quot;color:#ff0000;&quot;>finalChild = new CollectorB();</span>
morphline = new Compiler().compile(new File(morphlineFile),
morphlineId, morphlineContext, finalChild, override);
this.mappingTimer = morphlineContext.getMetricRegistry().timer(
MetricRegistry.name(&quot;morphline.app&quot;, Metrics.ELAPSED_TIME));
this.numRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name(&quot;morphline.app&quot;, Metrics.NUM_RECORDS));
this.numFailedRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name(&quot;morphline.app&quot;, &quot;numFailedRecords&quot;));
this.numExceptionRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name(&quot;morphline.app&quot;, &quot;numExceptionRecords&quot;));
}
@Override
public void process(Event event, RpcClient client) {
// LOG.info(&quot;entry into MorphlineHandlerImpl  process&quot;   + event);
numRecords.mark();
Timer.Context timerContext = mappingTimer.time();
try {
Record record = new Record();
for (Entry<String, String> entry : event.getHeaders().entrySet()) {
record.put(entry.getKey(), entry.getValue());
}
byte[] bytes = event.getBody();
if (bytes != null && bytes.length > 0) {
record.put(Fields.ATTACHMENT_BODY, bytes);
}
try {
Notifications.notifyStartSession(morphline);
if (!morphline.process(record)) {
numFailedRecords.mark();
LOG.warn(&quot;Morphline {} failed to process record: {}&quot;,
morphlineFileAndId, record);
}
<span style=&quot;color:#ff0000;&quot;>Map<String, String> headers = null;
List<Record> tmp = ((CollectorB) finalChild).getRecords();
List<Record> records = new ArrayList<Record>();
records.addAll(tmp);
tmp.clear();
//LOG.info(&quot;records     00000---------   &quot; + records.size());
Iterator irt = records.iterator();
while (irt.hasNext()) {
Record r = (Record) irt.next();
headers = new HashMap<String, String>();
ListMultimap<String, Object> lmt = r.getFields();
Map<String, Collection<Object>> m = lmt.asMap();
Iterator it = m.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Object> entry = (Entry<String, Object>) it
.next();
if (entry.getValue() != null) {
List v = (List) entry.getValue();
if (v.get(0) != null) {
headers.put(entry.getKey(), v.get(0).toString());
}
}
}
Event e = EventBuilder.withBody(event.getBody(), headers);
client.append(e);</span>
}
} catch (RuntimeException t) {
numExceptionRecords.mark();
morphlineContext.getExceptionHandler().handleException(t,
record);
} catch (EventDeliveryException e1) {
numExceptionRecords.mark();
morphlineContext.getExceptionHandler().handleException(e1,
record);
}
} finally {
timerContext.stop();
}
}
@Override
public void beginTransaction() {
Notifications.notifyBeginTransaction(morphline);
}
@Override
public void commitTransaction() {
Notifications.notifyCommitTransaction(morphline);
}
@Override
public void rollbackTransaction() {
Notifications.notifyRollbackTransaction(morphline);
}
@Override
public void stop() {
Notifications.notifyShutdown(morphline);
}
<span style=&quot;color:#ff0000;&quot;>public static final class CollectorB implements Command {
private final List<Record> results = new ArrayList();
public List<Record> getRecords() {
return results;
}
public void reset() {
results.clear();
}
@Override
public Command getParent() {
return null;
}
@Override
public void notify(Record notification) {
}
@Override
public boolean process(Record record) {
Preconditions.checkNotNull(record);
results.add(record);
return true;
}
}</span>
}
以上是费了几天的力气才完成的功能。发出来的目的:  
  1、知识的记录和备忘。
  2、对正在做这部分的同行有点帮助。
  3、请大家指正,是否有其他更好的办法,或者这种实现方式是否存在其他隐患。
  4、与大家交流,希望能提高自己。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144269-1-1.html 上篇帖子: Flume-ng使用指南 下篇帖子: Flume
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表