public int getBatchSize();
public void append(Event event) throws EventDeliveryException;
public void appendBatch(List<Event> events) throws
EventDeliveryException;
public boolean isActive();
public void close() throws FlumeException;
}
HostInfo host = HostInfo.getHostInfoList(properties).get(0);
hostname = host.getHostName();
port = host.getPortNumber();
// ClientWrapper
public ClientWrapper() throws Exception{
// 使用hostname, port来构建TSocket
transport = new TFastFramedTransport(new TSocket(hostname, port));
transport.open();
client = new ThriftSourceProtocol.Client(new TCompactProtocol
(transport));
// Not a great hash code, but since this class is immutable and there
// is at most one instance of the components of this class,
// this works fine [If the objects are equal, hash code is the same]
hashCode = random.nextInt();
}
private class ClientWrapper {
public final ThriftSourceProtocol.Client client;
public final TFastFramedTransport transport;
private final int hashCode;
public ClientWrapper() throws Exception{
transport = new TFastFramedTransport(new TSocket(hostname, port));
transport.open();
client = new ThriftSourceProtocol.Client(new TCompactProtocol
(transport));
// Not a great hash code, but since this class is immutable and there
// is at most one instance of the components of this class,
// this works fine [If the objects are equal, hash code is the same]
hashCode = random.nextInt();
}
}
private class ConnectionPoolManager {
private final Queue<ClientWrapper> availableClients;
private final Set<ClientWrapper> checkedOutClients;
private final int maxPoolSize;
private int currentPoolSize;
private final Lock poolLock;
private final Condition availableClientsCondition;
public ConnectionPoolManager(int poolSize) {
this.maxPoolSize = poolSize;
availableClients = new LinkedList<ClientWrapper>();
checkedOutClients = new HashSet<ClientWrapper>();
poolLock = new ReentrantLock();
availableClientsCondition = poolLock.newCondition();
currentPoolSize = 0;
}
public ClientWrapper checkout() throws Exception {
ClientWrapper ret = null;
poolLock.lock();
try {
if (availableClients.isEmpty() && currentPoolSize < maxPoolSize) {
ret = new ClientWrapper();
currentPoolSize++;
checkedOutClients.add(ret);
return ret;
}
while (availableClients.isEmpty()) {
availableClientsCondition.await();
}
ret = availableClients.poll();
checkedOutClients.add(ret);
} finally {
poolLock.unlock();
}
return ret;
}
public void checkIn(ClientWrapper client) {
poolLock.lock();
try {
availableClients.add(client);
checkedOutClients.remove(client);
availableClientsCondition.signal();
} finally {
poolLock.unlock();
}
}
public void destroy(ClientWrapper client) {
poolLock.lock();
try {
checkedOutClients.remove(client);
currentPoolSize--;
} finally {
poolLock.unlock();
}
client.transport.close();
}
public void closeAll() {
poolLock.lock();
try {
for (ClientWrapper c : availableClients) {
c.transport.close();
currentPoolSize--;
}
/*
* Be cruel and close even the checked out clients. The threads writing
* using these will now get an exception.
*/
for (ClientWrapper c : checkedOutClients) {
c.transport.close();
currentPoolSize--;
}
} finally {
poolLock.unlock();
}
}
}
}
public class RoundRobinOrderSelector<T> extends OrderSelector<T> {
private int nextHead = 0;
public RoundRobinOrderSelector(boolean shouldBackOff) {
super(shouldBackOff);
}
@Override
public Iterator<T> createIterator() {
List<Integer> activeIndices = getIndexList();
int size = activeIndices.size();
// possible that the size has shrunk so gotta adjust nextHead for that
if (nextHead >= size) {
nextHead = 0;
}
int begin = nextHead++;
if (nextHead == activeIndices.size()) {
nextHead = 0;
}
int[] indexOrder = new int[size];
for (int i = 0; i < size; i++) {
indexOrder = activeIndices.get((begin + i) % size);
}
return new SpecificOrderIterator<T>(indexOrder, getObjects());
}
}
public void append(Event event) throws EventDeliveryException {
//Why a local variable rather than just calling getClient()?
//If we get an EventDeliveryException, we need to call close on
//that specific client, getClient in this case, will get us
//the next client - leaving a resource leak.
RpcClient localClient = null;
synchronized (this) {
if (!isActive) {
logger.error("Attempting to append to an already closed client.");
throw new EventDeliveryException(
"Attempting to append to an already closed client.");
}
}
// Sit in an infinite loop and try to append!
int tries = 0;
while (tries < maxTries) {
try {
tries++;
localClient = getClient();
localClient.append(event);
return;
} catch (EventDeliveryException e) {
// Could not send event through this client, try to pick another client.
logger.warn("Client failed. Exception follows: ", e);
localClient.close();
localClient = null;
} catch (Exception e2) {
logger.error("Failed to send event: ", e2);
throw new EventDeliveryException(
"Failed to send event. Exception follows: ", e2);
}
}
logger.error("Tried many times, could not send event."
throw new EventDeliveryException("Failed to send the event!");
}