public boolean process(SelectionKey key, boolean waitForAck) throws IOException {
int ops = key.readyOps();
key.interestOps(key.interestOps() & ~ops);
//in case disconnect has been called
if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key.");
if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled.");
if ( key.isConnectable() ) {
if ( socketChannel.finishConnect() ) {
completeConnect();
if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
return false;
} else {
//wait for the connection to finish
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
return false;
}//end if
} else if ( key.isWritable() ) {
boolean writecomplete = write(key);
if ( writecomplete ) {
//we are completed, should we read an ack?
if ( waitForAck ) {
//register to read the ack
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} else {
//if not, we are ready, setMessage will reregister us for another write interest
//do a health check, we have no way of verify a disconnected
//socket since we don't register for OP_READ on waitForAck=false
read(key);//this causes overhead
setRequestCount(getRequestCount()+1);
return true;
}
} else {
//we are not complete, lets write some more
key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
}//end if
} else if ( key.isReadable() ) {
boolean readcomplete = read(key);
if ( readcomplete ) {
setRequestCount(getRequestCount()+1);
return true;
} else {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
}//end if
} else {
//unknown state, should never happen
log.warn("Data is in unknown state. readyOps="+ops);
throw new IOException("Data is in unknown state. readyOps="+ops);
}//end if
return false;
}
if ( !connected ) throw new ChannelException("Sender not connected.");
ParallelNioSender sender = (ParallelNioSender)getSender();
if (sender == null) {
ChannelException cx = new ChannelException("Unable to retrieve a data sender, time out error.");
for (int i = 0; i < destination.length; i++) cx.addFaultyMember(destination, new NullPointerException("Unable to retrieve a sender from the sender pool"));
throw cx;
} else {
try {
sender.sendMessage(destination, message);
sender.keepalive();
} catch (ChannelException x) {
sender.disconnect();
throw x;
} finally {
returnSender(sender);
if (!connected) disconnect();
}
}
这个地方调用了ParallelNioSender的sendMessage()方法后,还调用了keepalive(),这个方法去除NioSender的集合中不在存活状态的NioSender.最后归还NioSender.
看下ParallelNioSender的sendMessage()方法,主要是先拿到需要传播的NioSender数组,之后一个个的去传送。
如果NioSender数组在超时的时间限制内没有发送完成的话,就会抛出异常.主要是在doLoop()方法里面执行的,这里如果出现异常的话,还有NioSender重连等机制试图恢复NioSender的功能
看tomcat的设计,关于资源池的设计,基本上有两个集合,一个是已经使用的集合,一个是未使用的集合,当未使用集合还有资源的话,现在未使用的集合,当没有的话,在小于最大限度的情况下,可以重新new一个,当一个资源使用完毕的话,放入到未使用的集合可以重新利用。有些时候,一个资源是否可以放入未使用集合是有考虑的,比如他的持续的服务时间是否过长,如果过长就不会放在,因为这个资源过老的话,可能利用的资源不能得到释放,最坏的情况是资源被耗尽,如内存泄漏等,这样还不如重新new一个新的,这些都是我们设计资源池需要考虑的