设为首页 收藏本站
查看: 1347|回复: 0

[经验分享] flume源码分析-Sink

[复制链接]

尚未签到

发表于 2017-5-21 14:42:20 | 显示全部楼层 |阅读模式
  Sink 将从channel接收event,然后将event发往目标地址。 

/**
*
* A simple sink which reads events from a channel and writes them to HBase.
* This Sink uses an aysnchronous API internally and is likely to
* perform better.
* The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
* encountered in the classpath. This sink supports batch reading of
* events from the channel, and writing them to Hbase, to minimize the number
* of flushes on the hbase tables. To use this sink, it has to be configured
* with certain mandatory parameters:<p>*/
public class AsyncHBaseSink extends AbstractSink implements Configurable {
@Override
public Status process() throws EventDeliveryException {
/*
* Reference to the boolean representing failure of the current transaction.
* Since each txn gets a new boolean, failure of one txn will not affect
* the next even if errbacks for the current txn get called while
* the next one is being processed.
*
*/
if(!open){
throw new EventDeliveryException("Sink was never opened. " +
"Please fix the configuration.");
}
AtomicBoolean txnFail = new AtomicBoolean(false);
AtomicInteger callbacksReceived = new AtomicInteger(0);
AtomicInteger callbacksExpected = new AtomicInteger(0);
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
/*
* Callbacks can be reused per transaction, since they share the same
* locks and conditions.
*/
Callback<Object, Object> putSuccessCallback =
new SuccessCallback<Object, Object>(
lock, callbacksReceived, condition);
Callback<Object, Object> putFailureCallback =
new FailureCallback<Object, Object>(
lock, callbacksReceived, txnFail, condition);
Callback<Long, Long> incrementSuccessCallback =
new SuccessCallback<Long, Long>(
lock, callbacksReceived, condition);
Callback<Long, Long> incrementFailureCallback =
new FailureCallback<Long, Long>(
lock, callbacksReceived, txnFail, condition);
Status status = Status.READY;
Channel channel = getChannel();
int i = 0;
try {
txn = channel.getTransaction();
txn.begin();
for (; i < batchSize; i++) {
Event event = channel.take();
if (event == null) {
status = Status.BACKOFF;
if (i == 0) {
sinkCounter.incrementBatchEmptyCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
break;
} else {
serializer.setEvent(event);
List<PutRequest> actions = serializer.getActions();
List<AtomicIncrementRequest> increments = serializer.getIncrements();
callbacksExpected.addAndGet(actions.size() + increments.size());
for (PutRequest action : actions) {
client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
}
for (AtomicIncrementRequest increment : increments) {
client.atomicIncrement(increment).addCallbacks(
incrementSuccessCallback, incrementFailureCallback);
}
}
}
} catch (Throwable e) {
this.handleTransactionFailure(txn);
this.checkIfChannelExceptionAndThrow(e);
}
if (i == batchSize) {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(i);
lock.lock();
try {
//等待处理完成或者超时
while ((callbacksReceived.get() < callbacksExpected.get())
&& !txnFail.get()) {
try {
if(!condition.await(timeout, TimeUnit.MILLISECONDS)){
txnFail.set(true);
logger.warn("HBase callbacks timed out. "
+ "Transaction will be rolled back.");
}
} catch (Exception ex) {
logger.error("Exception while waiting for callbacks from HBase.");
this.handleTransactionFailure(txn);
Throwables.propagate(ex);
}
}
} finally {
lock.unlock();
}
/*
* At this point, either the txn has failed
* or all callbacks received and txn is successful.
*
* This need not be in the monitor, since all callbacks for this txn
* have been received. So txnFail will not be modified any more(even if
* it is, it is set from true to true only - false happens only
* in the next process call).
*
*/
if (txnFail.get()) {
this.handleTransactionFailure(txn);
throw new EventDeliveryException("Could not write events to Hbase. " +
"Transaction failed, and rolled back.");
} else {
try{
txn.commit();
txn.close();
sinkCounter.addToEventDrainSuccessCount(i);
} catch (Throwable e) {
this.handleTransactionFailure(txn);
this.checkIfChannelExceptionAndThrow(e);
}
}
return status;
}
}

/**
* <p>
* A {@link Sink} implementation that can send events to an RPC server (such as
* Flume's {@link AvroSource}).
* </p>
* <p>
* This sink forms one half of Flume's tiered collection support. Events sent to
* this sink are transported over the network to the hostname / port pair using
* the RPC implementation encapsulated in {@link RpcClient}.
* The destination is an instance of Flume's {@link AvroSource}, which
* allows Flume agents to forward to other Flume agents, forming a tiered
* collection infrastructure. Of course, nothing prevents one from using this
* sink to speak to other custom built infrastructure that implements the same
* RPC protocol.
* </p>
* <p>
* Events are taken from the configured {@link Channel} in batches of the
* configured <tt>batch-size</tt>. The batch size has no theoretical limits
* although all events in the batch <b>must</b> fit in memory. Generally, larger
* batches are far more efficient, but introduce a slight delay (measured in
* millis) in delivery. The batch behavior is such that underruns (i.e. batches
* smaller than the configured batch size) are possible. This is a compromise
* made to maintain low latency of event delivery. If the channel returns a null
* event, meaning it is empty, the batch is immediately sent, regardless of
* size. Batch underruns are tracked in the metrics. Empty batches do not incur
* an RPC roundtrip.*/
public class AvroSink extends AbstractSink implements Configurable {
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
verifyConnection();
List<Event> batch = Lists.newLinkedList();
for (int i = 0; i < client.getBatchSize(); i++) {
Event event = channel.take();
if (event == null) {
break;
}
batch.add(event);
}
int size = batch.size();
int batchSize = client.getBatchSize();
if (size == 0) {
sinkCounter.incrementBatchEmptyCount();
status = Status.BACKOFF;
} else {
if (size < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(size);
client.appendBatch(batch);
}
transaction.commit();
sinkCounter.addToEventDrainSuccessCount(size);
} catch (Throwable t) {
transaction.rollback();
if (t instanceof Error) {
throw (Error) t;
} else if (t instanceof ChannelException) {
logger.error("Avro Sink " + getName() + ": Unable to get event from" +
" channel " + channel.getName() + ". Exception follows.", t);
status = Status.BACKOFF;
} else {
destroyConnection();
throw new EventDeliveryException("Failed to send events", t);
}
} finally {
transaction.close();
}
return status;
}
}


/**
*
* A simple sink which reads events from a channel and writes them to HBase.
* The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
* encountered in the classpath. This sink supports batch reading of
* events from the channel, and writing them to Hbase, to minimize the number
* of flushes on the hbase tables. To use this sink, it has to be configured
* with certain mandatory parameters:<p>
* <tt>table: </tt> The name of the table in Hbase to write to. <p>
* <tt>columnFamily: </tt> The column family in Hbase to write to.<p>
* This sink will commit each transaction if the table's write buffer size is
* reached or if the number of events in the current transaction reaches the
* batch size, whichever comes first.<p>
* Other optional parameters are:<p>
* <tt>serializer:</tt> A class implementing {@link HBaseEventSerializer}.
*  An instance of
* this class will be used to write out events to hbase.<p>
* <tt>serializer.*:</tt> Passed in the configure() method to serializer
* as an object of {@link org.apache.flume.Context}.<p>
* <tt>batchSize: </tt>This is the batch size used by the client. This is the
* maximum number of events the sink will commit per transaction. The default
* batch size is 100 events.
* <p>*/
public class HBaseSink extends AbstractSink implements Configurable {
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
List<Row> actions = new LinkedList<Row>();
List<Increment> incs = new LinkedList<Increment>();
txn.begin();
for(long i = 0; i < batchSize; i++) {
Event event = channel.take();
if(event == null){
status = Status.BACKOFF;
counterGroup.incrementAndGet("channel.underflow");
break;
} else {
serializer.initialize(event, columnFamily);
actions.addAll(serializer.getActions());
incs.addAll(serializer.getIncrements());
}
}
putEventsAndCommit(actions, incs, txn);
return status;
}
private void putEventsAndCommit(List<Row> actions, List<Increment> incs,
Transaction txn) throws EventDeliveryException {
try {
table.batch(actions);
for(Increment i: incs){
table.increment(i);
}
txn.commit();
counterGroup.incrementAndGet("transaction.success");
} catch (Throwable e) {
try{
txn.rollback();
} catch (Exception e2) {
logger.error("Exception in rollback. Rollback might not have been" +
"successful." , e2);
}
counterGroup.incrementAndGet("transaction.rollback");
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
if(e instanceof Error || e instanceof RuntimeException){
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
} else {
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
throw new EventDeliveryException("Failed to commit transaction." +
"Transaction rolled back.", e);
}
} finally {
txn.close();
}
}
}

public class HDFSEventSink extends AbstractSink implements Configurable {
/**
* Pull events out of channel and send it to HDFS - take at the most
* txnEventMax, that's the maximum #events to hold in channel for a given
* transaction - find the corresponding bucket for the event, ensure the file
* is open - extract the pay-load and append to HDFS file <br />
* WARNING: NOT THREAD SAFE
*/
@Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
List<BucketWriter> writers = Lists.newArrayList();
transaction.begin();
try {
Event event = null;
int txnEventCount = 0;
for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
event = channel.take();
if (event == null) {
break;
}
// reconstruct the path name by substituting place holders
String realPath = BucketPath.escapeString(path, event.getHeaders(),
needRounding, roundUnit, roundValue);
BucketWriter bucketWriter = sfWriters.get(realPath);
// we haven't seen this file yet, so open it and cache the handle
if (bucketWriter == null) {
HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
FlumeFormatter formatter = HDFSFormatterFactory
.getFormatter(writeFormat);
bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
batchSize, context, realPath, codeC, compType, hdfsWriter,
formatter, timedRollerPool, proxyTicket, sinkCounter);
sfWriters.put(realPath, bucketWriter);
}
// track the buckets getting written in this transaction
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
// Write the data to HDFS
append(bucketWriter, event);
}
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == txnEventMax) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
if (!bucketWriter.isBatchComplete()) {
flush(bucketWriter);
}
}
transaction.commit();
if (txnEventCount > 0) {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
}
if(event == null) {
return Status.BACKOFF;
}
return Status.READY;
} catch (IOException eIO) {
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
transaction.close();
}
}
}

//IRC是网络传输协议
public class IRCSink extends AbstractSink implements Configurable {
@Override
public Status process() throws EventDeliveryException {
Status status = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
createConnection();
Event event = channel.take();
if (event == null) {
counterGroup.incrementAndGet("event.empty");
status = Status.BACKOFF;
} else {
sendLine(event);
counterGroup.incrementAndGet("event.irc");
}
transaction.commit();
} catch (ChannelException e) {
transaction.rollback();
logger.error(
"Unable to get event from channel. Exception follows.", e);
status = Status.BACKOFF;
} catch (Exception e) {
transaction.rollback();
logger.error(
"Unable to communicate with IRC server. Exception follows.",
e);
status = Status.BACKOFF;
destroyConnection();
} finally {
transaction.close();
}
return status;
}
}

public class LoggerSink extends AbstractSink {
private static final Logger logger = LoggerFactory
.getLogger(LoggerSink.class);
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
try {
transaction.begin();
event = channel.take();
if (event != null) {
if (logger.isInfoEnabled()) {
logger.info("Event: " + EventHelper.dumpEvent(event));
}
} else {
// No event found, request back-off semantics from the sink runner
result = Status.BACKOFF;
}
transaction.commit();
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException("Failed to log event: " + event, ex);
} finally {
transaction.close();
}
return result;
}
}

  [size=1em]RollingFileSink每个sink.rollInterval都会生成一个新的文件用来存储event 

public class RollingFileSink extends AbstractSink implements Configurable {
@Override
public Status process() throws EventDeliveryException {
if (shouldRotate) {
logger.debug("Time to rotate {}", pathController.getCurrentFile());
if (outputStream != null) {
logger.debug("Closing file {}", pathController.getCurrentFile());
try {
serializer.flush();
serializer.beforeClose();
outputStream.flush();
outputStream.close();
shouldRotate = false;
} catch (IOException e) {
throw new EventDeliveryException("Unable to rotate file "
+ pathController.getCurrentFile() + " while delivering event", e);
}
serializer = null;
outputStream = null;
pathController.rotate();
}
}
if (outputStream == null) {
File currentFile = pathController.getCurrentFile();
logger.debug("Opening output stream for file {}", currentFile);
try {
outputStream = new BufferedOutputStream(
new FileOutputStream(currentFile));
serializer = EventSerializerFactory.getInstance(
serializerType, serializerContext, outputStream);
serializer.afterCreate();
} catch (IOException e) {
throw new EventDeliveryException("Failed to open file "
+ pathController.getCurrentFile() + " while delivering event", e);
}
}
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
Status result = Status.READY;
try {
transaction.begin();
event = channel.take();
if (event != null) {
serializer.write(event);
/*
* FIXME: Feature: Rotate on size and time by checking bytes written and
* setting shouldRotate = true if we're past a threshold.
*/
/*
* FIXME: Feature: Control flush interval based on time or number of
* events. For now, we're super-conservative and flush on each write.
*/
serializer.flush();
outputStream.flush();
} else {
// No events found, request back-off semantics from runner
result = Status.BACKOFF;
}
transaction.commit();
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException("Failed to process event: " + event, ex);
} finally {
transaction.close();
}
return result;
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379658-1-1.html 上篇帖子: flume源码分析-source 下篇帖子: Flume架构与源码分析-整体架构
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表