org.apache.flume.channel.BasicTransactionSemantics An implementation of basic Transaction semantics designed to work in concert with BasicChannelSemantics to simplify creation of robust Channel implementations. This class ensures that each transaction implementation method is called only while the transaction is in the correct state for that method, and only by the thread that created the transaction. Nested calls to begin() and close() are supported as long as they are balanced.
Subclasses need only implement doPut, doTake, doCommit, and doRollback, and the developer can rest assured that those methods are called only after transaction state preconditions have been properly met. doBegin and doClose may also be implemented if there is work to be done at those points.
All InterruptedException exceptions thrown from the implementations of the doXXX methods are automatically wrapped to become ChannelExceptions, but only after restoring the interrupted status of the thread so that any subsequent blocking method calls will themselves throw InterruptedException rather than blocking. The exception to this rule is doTake, which simply returns null instead of wrapping and propagating the InterruptedException, though it still first restores the interrupted status of the thread.
protected void
put(Event event)
The method to which BasicChannelSemantics delegates calls to put.
void
rollback()
Indicates that the transaction can must be aborted.
protected Event
take()
The method to which BasicChannelSemantics delegates calls to take.
String
toString()
其中put take begin close commit rollback 的结构都很相似。主要结构都是确保这些操作时Transaction在正确的对应状态,然后调用doXXX方法。如果当前的线程不拥有当前的事务或者事务的状态不对,就抛出异常。如果doXXX方法抛出InterruptedException就通过Thread.currentThread.interrupt()方法恢复当前线程的interrupted状态,然后将捕获的InterruptedException包装成一个ChannelException,抛出。
@Override
public void rollback() {
Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
"rollback() called from different thread than getTransaction()!");
Preconditions.checkState(state.equals(State.OPEN),
"rollback() called when transaction is %s!", state);
state = State.COMPLETED;
try {
doRollback();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ChannelException(e.toString(), e);
}
}