tomcat NioSender
上次说了NioReceiver,这次看看NioSender,NioSender的体系结构比较复杂的,但感觉不用这么复杂,先看下类图:这里觉得ParallelNioSender和PooledParallelSender之间有一个类可以去掉,如去了PooledParallelSender,PooledSender类设计成接口,这样的话会比较简洁.
ChannelCoordinator使用的是PooledParallelSender,最后面最重要的是还是NioSender。
NioSender是个状态机,看注释:
* - NOT_CONNECTED -> connect() -> CONNECTED
* - CONNECTED -> setMessage() -> READY TO WRITE
* - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ
* - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE
* - TRANSFER_COMPLETE -> CONNECTED
这样可以循环的利用:
public boolean process(SelectionKey key, boolean waitForAck) throws IOException {
int ops = key.readyOps();
key.interestOps(key.interestOps() & ~ops);
//in case disconnect has been called
if ((!isConnected()) && (!connecting)) throw new IOException("Sender has been disconnected, can't selection key.");
if ( !key.isValid() ) throw new IOException("Key is not valid, it must have been cancelled.");
if ( key.isConnectable() ) {
if ( socketChannel.finishConnect() ) {
completeConnect();
if ( current != null ) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
return false;
} else{
//wait for the connection to finish
key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);
return false;
}//end if
} else if ( key.isWritable() ) {
boolean writecomplete = write(key);
if ( writecomplete ) {
//we are completed, should we read an ack?
if ( waitForAck ) {
//register to read the ack
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} else {
//if not, we are ready, setMessage will reregister us for another write interest
//do a health check, we have no way of verify a disconnected
//socket since we don't register for OP_READ on waitForAck=false
read(key);//this causes overhead
setRequestCount(getRequestCount()+1);
return true;
}
} else {
//we are not complete, lets write some more
key.interestOps(key.interestOps()|SelectionKey.OP_WRITE);
}//end if
} else if ( key.isReadable() ) {
boolean readcomplete = read(key);
if ( readcomplete ) {
setRequestCount(getRequestCount()+1);
return true;
} else {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
}//end if
} else {
//unknown state, should never happen
log.warn("Data is in unknown state. readyOps="+ops);
throw new IOException("Data is in unknown state. readyOps="+ops);
}//end if
return false;
}
首先屏蔽了SelectionKey的interest集合中的ready集合,只有在这一次处理完以后才会恢复,这样的话下一次的数据不会影响到这一次的数,之后就是 SelectionKey的各种状态的处理,其中read()是调用write()后用于接受ack回执的.
从ChannelCoordinator开始,来看看NioSender的调用的流程:
ChannelCoordinator.sendMessage()---->ReplicationTransmitter.sendMessage()---->PooledParallelSender.sendMessage()
---->ParallelNioSender.sendMessage()---->ParallelNioSender.doLoop()---->NioSender.process()
可以吧PooledParallelSender看成一个门面,在这里是NioSender的入口,PooledSender里面主要是一个资源池的作用,这样可以合理利用和控制资源。
看PooledParallelSender的sendMessage()方法:
if ( !connected ) throw new ChannelException("Sender not connected.");
ParallelNioSender sender = (ParallelNioSender)getSender();
if (sender == null) {
ChannelException cx = new ChannelException("Unable to retrieve a data sender, time out error.");
for (int i = 0; i < destination.length; i++) cx.addFaultyMember(destination, new NullPointerException("Unable to retrieve a sender from the sender pool"));
throw cx;
} else {
try {
sender.sendMessage(destination, message);
sender.keepalive();
} catch (ChannelException x) {
sender.disconnect();
throw x;
} finally {
returnSender(sender);
if (!connected) disconnect();
}
}
这个地方调用了ParallelNioSender的sendMessage()方法后,还调用了keepalive(),这个方法去除NioSender的集合中不在存活状态的NioSender.最后归还NioSender.
看下ParallelNioSender的sendMessage()方法,主要是先拿到需要传播的NioSender数组,之后一个个的去传送。
如果NioSender数组在超时的时间限制内没有发送完成的话,就会抛出异常.主要是在doLoop()方法里面执行的,这里如果出现异常的话,还有NioSender重连等机制试图恢复NioSender的功能
看tomcat的设计,关于资源池的设计,基本上有两个集合,一个是已经使用的集合,一个是未使用的集合,当未使用集合还有资源的话,现在未使用的集合,当没有的话,在小于最大限度的情况下,可以重新new一个,当一个资源使用完毕的话,放入到未使用的集合可以重新利用。有些时候,一个资源是否可以放入未使用集合是有考虑的,比如他的持续的服务时间是否过长,如果过长就不会放在,因为这个资源过老的话,可能利用的资源不能得到释放,最坏的情况是资源被耗尽,如内存泄漏等,这样还不如重新new一个新的,这些都是我们设计资源池需要考虑的
页:
[1]