@Override
protected BasicTransactionSemantics createTransaction() {
trans = new FileBackedTransaction(log, TransactionIDOracle.next(),
transactionCapacity, keepAlive, queueRemaining, getName(),
channelCounter);
transactions.set(trans);
return trans;
}
@Override
protected void doPut(Event event) throws InterruptedException {
boolean success = false;
FlumeEventPointer ptr = log.put(transactionID, event);
Preconditions.checkState(putList.offer(ptr), "putList offer failed "
+ channelNameDescriptor);
}
@Override
protected Event doTake() throws InterruptedException {
/*
* 1. Take an event which is in the queue.
* 2. If getting that event does not throw NoopRecordException,
* then return it.
* 3. Else try to retrieve the next event from the queue
* 4. Repeat 2 and 3 until queue is empty or an event is returned.
*/
try {
while (true) {
FlumeEventPointer ptr = queue.removeHead(transactionID);
if (ptr == null) {
return null;
} else {
try {
// first add to takeList so that if write to disk
// fails rollback actually does it's work
Preconditions.checkState(takeList.offer(ptr),
"takeList offer failed "
+ channelNameDescriptor);
log.take(transactionID, ptr); // write take to disk
Event event = log.get(ptr);
return event;
} catch (IOException e) {
throw new ChannelException("Take failed due to IO error "
+ channelNameDescriptor, e);
} catch (NoopRecordException e) {
takeList.remove(ptr);
}
}
}
@Override
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
if(puts > 0) {
log.commitPut(transactionID);
synchronized (queue) {
while(!putList.isEmpty()) {
queue.addTail(putList.removeFirst())
}
} else if (takes > 0) {
log.commitTake(transactionID);
queue.completeTransaction(transactionID);
}
putList.clear();
takeList.clear();
}
@Override
protected void doRollback() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
if(takes > 0) {
while (!takeList.isEmpty()) {
Preconditions.checkState(queue.addHead(takeList.removeLast()),
"Queue add failed, this shouldn't be able to happen "
+ channelNameDescriptor);
}
}
putList.clear();
takeList.clear();
queue.completeTransaction(transactionID);
log.rollback(transactionID);
}
每隔一段时间,File Channel都会do checkpoint。
FlumeEventQueue{
synchronized boolean checkpoint(boolean force) {
if (!elements.syncRequired() && !force) {
LOG.debug("Checkpoint not required");
return false;
}
// Start checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
//插入header信息,包括queue的header位置,queue的size
updateHeaders();
//将 文件的Id和文件的个数写入buffre中
List<Long> fileIdAndCountEncoded = new ArrayList<Long>();
for (Integer fileId : fileIDCounts.keySet()) {
Integer count = fileIDCounts.get(fileId).get();
long value = encodeActiveLogCounter(fileId, count);
fileIdAndCountEncoded.add(value);
}
int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size();
for (int i = 0; i < emptySlots; i++) {
fileIdAndCountEncoded.add(0L);
}
for (int i = 0; i < MAX_ACTIVE_LOGS; i++) {
elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i));
}
//将elements中的对象sync到elementsBuffer中,elements中存储了fileId以及transcation的offset
elements.sync();
// Finish checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
//将数据同步到device
mappedBuffer.force();
return true;
}
}
class LogFile {
//将当前位置写入到文件中
synchronized void markCheckpoint(long timestamp) throws IOException {
long currentPosition = writeFileChannel.position();
writeFileHandle.seek(checkpointPositionMarker);
writeFileHandle.writeLong(currentPosition);
writeFileHandle.writeLong(timestamp);
writeFileChannel.position(currentPosition);
LOG.info("Noted checkpoint for file: " + file + ", id: " + fileID
+ ", checkpoint position: " + currentPosition);
}
}
在flume crash后,flume会replay恢复到crash之前的状态
void replay() throws IOException {
Preconditions.checkState(!open, "Cannot replay after Log has been opened");
checkpointWriterLock.lock();
try {
/*
* First we are going to look through the data directories
* and find all log files. We will store the highest file id
* (at the end of the filename) we find and use that when we
* create additional log files.
*
* Also store up the list of files so we can replay them later.
*/
LOGGER.info("Replay started");
nextFileID.set(0);
List<File> dataFiles = Lists.newArrayList();
for (File logDir : logDirs) {
for (File file : LogUtils.getLogs(logDir)) {
int id = LogUtils.getIDForFile(file);
dataFiles.add(file);
nextFileID.set(Math.max(nextFileID.get(), id));
idLogFileMap.put(id, new LogFile.RandomReader(new File(logDir, PREFIX
+ id)));
}
}
LOGGER.info("Found NextFileID " + nextFileID +
", from " + Arrays.toString(logDirs));
/*
* sort the data files by file id so we can replay them by file id
* which should approximately give us sequential events
*/
LogUtils.sort(dataFiles);
/*
* Read the checkpoint (in memory queue) from one of two alternating
* locations. We will read the last one written to disk.
*/
queue = new FlumeEventQueue(queueCapacity,
new File(checkpointDir, "checkpoint"), channelName);
long ts = queue.getTimestamp();
LOGGER.info("Last Checkpoint " + new Date(ts) +
", queue depth = " + queue.getSize());
/*
* We now have everything we need to actually replay the log files
* the queue, the timestamp the queue was written to disk, and
* the list of data files.
*/
ReplayHandler replayHandler = new ReplayHandler(queue);
replayHandler.replayLog(dataFiles);
for (int index = 0; index < logDirs.length; index++) {
LOGGER.info("Rolling " + logDirs[index]);
roll(index);
}
/*
* Now that we have replayed, write the current queue to disk
*/
writeCheckpoint(true);
open = true;
} catch (Exception ex) {
LOGGER.error("Failed to initialize Log on " + channelNameDescriptor, ex);
if (ex instanceof IOException) {
throw (IOException) ex;
}
Throwables.propagate(ex);
} finally {
checkpointWriterLock.unlock();
}
}