设为首页 收藏本站
查看: 1830|回复: 0

[经验分享] flume源码分析-channel

[复制链接]

尚未签到

发表于 2017-5-21 14:44:17 | 显示全部楼层 |阅读模式
Channel 相当于一个管道,source写数据到channel中,sink从channel取数据。Channel有三类,memory,file,jdbc。memory速度最快,但是当机器宕机的时候数据会丢失,file数据不会丢失,jdbc速度最慢,一般选择fileChannel。
 
 Source 中会调用ChannelProcessor的processEvent方法处理Log事件。对于每个channel都会获得一个Transaction,  然后调用tx.begin方法,put event 到channel,然后调用commit.如果有异常,调用transactoin的roll back方法,没有则调用tx.close()关闭transaction

public void processEvent(Event event) {
event = interceptorChain.intercept(event);
if (event == null) {
return;
}
// Process required channels
List<Channel> requiredChannels = selector.getRequiredChannels(event);
for (Channel reqChannel : requiredChannels) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin();
reqChannel.put(event);
tx.commit();
} catch (Throwable t) {
tx.rollback();
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " +
reqChannel, t);
throw (Error) t;
} else {
throw new ChannelException("Unable to put event on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
}
}
// Process optional channels
List<Channel> optionalChannels = selector.getOptionalChannels(event);
for (Channel optChannel : optionalChannels) {
Transaction tx = null;
try {
tx = optChannel.getTransaction();
tx.begin();
optChannel.put(event);
tx.commit();
} catch (Throwable t) {
tx.rollback();
LOG.error("Unable to put event on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
}
}
}
   我们先看MemoryChannel对应的方法,对于source,memoryChannel会把Event先放到PutList,当调用commit时,会将Event从PutList移到Queue中,如果rollback,则从PutList直接删除。对于sink,当调用take方法时,会先将Event从queue中移到takeList中,如果commit,则将takeList清空。如果rollback,则将takeList中的Event移回到queue中
   

//创建了一个MemoryTransaction
@Override
protected BasicTransactionSemantics createTransaction() {
return new MemoryTransaction(transCapacity, channelCounter);
}
@Override
protected void doPut(Event event) throws InterruptedException {
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, TimeUnit.SECONDS)) {
putList.offer(event)
putByteCounter += eventByteSize;
}
@Override
protected Event doTake() throws InterruptedException {      
Event event;
synchronized(queueLock) {
event = queue.poll();
}
takeList.put(event);
return event;
}
@Override
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
queue.offer(putList.removeFirst())
}
}
putList.clear();
takeList.clear();
}     
}
@Override
protected void doRollback() {
int takes = takeList.size();
while(!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
putList.clear();
}
}
  FileChannel用来将数据存储到临时文件中。在[size=1em]createTransaction方法中创建了一个[size=1em]FileBackedTransaction对象,对于source,会调用[size=1em]doPut[size=1em]方法,[size=1em]doPut[size=1em]方法中会调用log.put方法,log.put方法主要是用来将event和transactionID包装成一个byteBuffer,写入到file中,返回fileID和offset. doTake方法会调用log.take方法,log.take方法主要用来将transactionID,fileID,offSet包装成一个byteBuffer写到log file中。rollback方法将transactionID包装成一个bytebuffer写到log file中。commit方法将transactionID和TransactionEventRecord.Type包装成 一个byteBuffer,写入到file中。log file位于dataDirs下,以“log-”为前缀.综上所述File channel类似于Memorychannel,但file channel把数据存储到文件中,并将每次的操作都记录下来。http://www.tuicool.com/articles/B3UbYn

@Override
protected BasicTransactionSemantics createTransaction() {
trans = new FileBackedTransaction(log, TransactionIDOracle.next(),
transactionCapacity, keepAlive, queueRemaining, getName(),
channelCounter);
transactions.set(trans);
return trans;
}
@Override
protected void doPut(Event event) throws InterruptedException {
boolean success = false;
FlumeEventPointer ptr = log.put(transactionID, event);
Preconditions.checkState(putList.offer(ptr), "putList offer failed "
+ channelNameDescriptor);
}
@Override
protected Event doTake() throws InterruptedException {
/*
* 1. Take an event which is in the queue.
* 2. If getting that event does not throw NoopRecordException,
*    then return it.
* 3. Else try to retrieve the next event from the queue
* 4. Repeat 2 and 3 until queue is empty or an event is returned.
*/
try {
while (true) {
FlumeEventPointer ptr = queue.removeHead(transactionID);
if (ptr == null) {
return null;
} else {
try {
// first add to takeList so that if write to disk
// fails rollback actually does it's work
Preconditions.checkState(takeList.offer(ptr),
"takeList offer failed "
+ channelNameDescriptor);
log.take(transactionID, ptr); // write take to disk
Event event = log.get(ptr);
return event;
} catch (IOException e) {
throw new ChannelException("Take failed due to IO error "
+ channelNameDescriptor, e);
} catch (NoopRecordException e) {
takeList.remove(ptr);
}
}
}
@Override
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
if(puts > 0) {
log.commitPut(transactionID);
synchronized (queue) {
while(!putList.isEmpty()) {
queue.addTail(putList.removeFirst())              
}
} else if (takes > 0) {
log.commitTake(transactionID);
queue.completeTransaction(transactionID);
}
putList.clear();
takeList.clear();
}
@Override
protected void doRollback() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
if(takes > 0) {
while (!takeList.isEmpty()) {
Preconditions.checkState(queue.addHead(takeList.removeLast()),
"Queue add failed, this shouldn't be able to happen "
+ channelNameDescriptor);
}
}
putList.clear();
takeList.clear();
queue.completeTransaction(transactionID);
log.rollback(transactionID);
}

 每隔一段时间,File Channel都会do checkpoint。

DSC0000.png

 

FlumeEventQueue{
synchronized boolean checkpoint(boolean force) {
if (!elements.syncRequired() && !force) {
LOG.debug("Checkpoint not required");
return false;
}
// Start checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
//插入header信息,包括queue的header位置,queue的size
updateHeaders();
//将 文件的Id和文件的个数写入buffre中
List<Long> fileIdAndCountEncoded = new ArrayList<Long>();
for (Integer fileId : fileIDCounts.keySet()) {
Integer count = fileIDCounts.get(fileId).get();
long value = encodeActiveLogCounter(fileId, count);
fileIdAndCountEncoded.add(value);
}
int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size();
for (int i = 0; i < emptySlots; i++)  {
fileIdAndCountEncoded.add(0L);
}
for (int i = 0; i < MAX_ACTIVE_LOGS; i++) {
elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i));
}
//将elements中的对象sync到elementsBuffer中,elements中存储了fileId以及transcation的offset
elements.sync();
// Finish checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
//将数据同步到device
mappedBuffer.force();
return true;
}
}
    
class LogFile {
//将当前位置写入到文件中
synchronized void markCheckpoint(long timestamp) throws IOException {
long currentPosition = writeFileChannel.position();
writeFileHandle.seek(checkpointPositionMarker);
writeFileHandle.writeLong(currentPosition);
writeFileHandle.writeLong(timestamp);
writeFileChannel.position(currentPosition);
LOG.info("Noted checkpoint for file: " + file + ", id: " + fileID
+ ", checkpoint position: " + currentPosition);
}
}
    在flume crash后,flume会replay恢复到crash之前的状态
void replay() throws IOException {
Preconditions.checkState(!open, "Cannot replay after Log has been opened");
checkpointWriterLock.lock();
try {
/*
* First we are going to look through the data directories
* and find all log files. We will store the highest file id
* (at the end of the filename) we find and use that when we
* create additional log files.
*
* Also store up the list of files so we can replay them later.
*/
LOGGER.info("Replay started");
nextFileID.set(0);
List<File> dataFiles = Lists.newArrayList();
for (File logDir : logDirs) {
for (File file : LogUtils.getLogs(logDir)) {
int id = LogUtils.getIDForFile(file);
dataFiles.add(file);
nextFileID.set(Math.max(nextFileID.get(), id));
idLogFileMap.put(id, new LogFile.RandomReader(new File(logDir, PREFIX
+ id)));
}
}
LOGGER.info("Found NextFileID " + nextFileID +
", from " + Arrays.toString(logDirs));
/*
* sort the data files by file id so we can replay them by file id
* which should approximately give us sequential events
*/
LogUtils.sort(dataFiles);
/*
* Read the checkpoint (in memory queue) from one of two alternating
* locations. We will read the last one written to disk.
*/
queue = new FlumeEventQueue(queueCapacity,
new File(checkpointDir, "checkpoint"), channelName);
long ts = queue.getTimestamp();
LOGGER.info("Last Checkpoint " + new Date(ts) +
", queue depth = " + queue.getSize());
/*
* We now have everything we need to actually replay the log files
* the queue, the timestamp the queue was written to disk, and
* the list of data files.
*/
ReplayHandler replayHandler = new ReplayHandler(queue);
replayHandler.replayLog(dataFiles);
for (int index = 0; index < logDirs.length; index++) {
LOGGER.info("Rolling " + logDirs[index]);
roll(index);
}
/*
* Now that we have replayed, write the current queue to disk
*/
writeCheckpoint(true);
open = true;
} catch (Exception ex) {
LOGGER.error("Failed to initialize Log on " + channelNameDescriptor, ex);
if (ex instanceof IOException) {
throw (IOException) ex;
}
Throwables.propagate(ex);
} finally {
checkpointWriterLock.unlock();
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379660-1-1.html 上篇帖子: Flume架构与源码分析-整体架构 下篇帖子: flume-ng启动时遇到报错时的解决
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表