Manager contextManager = null;
if (manager == null) {
if ( (getCluster() != null) && distributable) {
try {
contextManager = getCluster().createManager(getName());
} catch (Exception ex) {
log.error("standardContext.clusterFail", ex);
ok = false;
}
} else {
contextManager = new StandardManager();
}
}
// Configure default manager if none was specified
if (contextManager != null) {
setManager(contextManager);
}
if (manager!=null && (getCluster() != null) && distributable) {
//let the cluster know that there is a context that is distributable
//and that it has its own manager
getCluster().registerManager(manager);
}
如果配置了tomcat的集群,那么 contextManager = getCluster().createManager(getName())就会被调用,返回的Manager的类型是DeltaManager,接着调用setManager(contextManager)方法:
public synchronized void setManager(Manager manager) {
// Change components if necessary 管理session的manager
Manager oldManager = this.manager;
if (oldManager == manager)
return;
this.manager = manager;
// Stop the old component if necessary
if (started && (oldManager != null) &&
(oldManager instanceof Lifecycle)) {
try {
((Lifecycle) oldManager).stop();
} catch (LifecycleException e) {
log.error("ContainerBase.setManager: stop: ", e);
}
}
// Start the new component if necessary
if (manager != null)
manager.setContainer(this);
if (started && (manager != null) &&
(manager instanceof Lifecycle)) {
try {
((Lifecycle) manager).start();
} catch (LifecycleException e) {
log.error("ContainerBase.setManager: start: ", e);
}
}
// Report this property change to interested listeners
support.firePropertyChange("manager", oldManager, this.manager);
}
public synchronized void getAllClusterSessions() {
if (cluster != null && cluster.getMembers().length > 0) {
long beforeSendTime = System.currentTimeMillis();
Member mbr = findSessionMasterMember();
if(mbr == null) { // No domain member found
return;
}
SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
// set reference time
stateTransferCreateSendTime = beforeSendTime ;
// request session state
counterSend_EVT_GET_ALL_SESSIONS++;
stateTransfered = false ;
// FIXME This send call block the deploy thread, when sender waitForAck is enabled
try {
synchronized(receivedMessageQueue) {
receiverQueue = true ;
}
cluster.send(msg, mbr);
if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr,getStateTransferTimeout()));
// FIXME At sender ack mode this method check only the state transfer and resend is a problem!
waitForSendAllSessions(beforeSendTime);
public void send(ClusterMessage msg, Member dest) {
try {
msg.setAddress(getLocalMember());
if (dest != null) {
if (!getLocalMember().equals(dest)) {
channel.send(new Member[] {dest}, msg,channelSendOptions);
} else
log.error("Unable to send message to local member " + msg);
} else {
if (channel.getMembers().length>0)
channel.send(channel.getMembers(),msg,channelSendOptions);
else if (log.isDebugEnabled())
log.debug("No members in cluster, ignoring message:"+msg);
}
} catch (Exception x) {
log.error("Unable to send message through cluster sender.", x);
}
}
实际调用的是GroupChannel的send()方法,另外发送数据的类别是异步的(SEND_OPTIONS_ASYNCHRONOUS)
接着调用的是ChannelInterceptor的sendMessage()方法,这是个责任链模式的应用,其实默认的ChannelInterceptor只有两个:MessageDispatch15Interceptor和TcpFailureDetector。MessageDispatch15Interceptor只是在开了另外一个线程后,在一定条件下在该线程中进行sendMessage()方法调用。TcpFailureDetector的作用是处理sendMessage()方法出异常后对应的集群中的Member。最后面调用的ChannelCoordinator的sendMessage()方法:
public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
if ( destination == null ) destination = membershipService.getMembers();
clusterSender.sendMessage(msg,destination);
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("ChannelCoordinator - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+Arrays.toNameString(destination));
}
}
其实调用的是ParallelNioSender的sendMessage()方法,最终调用的是NioSender类进行底层的处理。到这里刚刚启动的tomcat的请求已经完成,这个时候在集群的中的一台tomcat收到消息,进行相应处理,接收消息的类是NioReceiver
让我们转换思想,转换到接收方进行考虑:
NioReceiver在前面的博文中已经详细讲过,其实他就是个后台线程,在不断的跑。在有消息进来,接收到后,每个消息产生一个Runnable任务,然后交给线程池处理。看Runnable任务的NioReplicationTask的drainChannel()方法中,有一段代码:
//process the message ReceiverBase.messageDataReceived()
getCallback().messageDataReceived(msgs);
调用ListenCallback的方法,如下:
public void messageDataReceived(ChannelMessage data) {
if ( this.listener != null ) {
if ( listener.accept(data) ) listener.messageReceived(data);
}
}
这里的listener的实现类是:ChannelCoordinator,看代码是调用的父类的ChannelInterceptorBase的方法:
public void messageReceived(ChannelMessage msg) {
if (getPrevious() != null) getPrevious().messageReceived(msg);
}
这里的ChannelInterceptor还是前面说到的那两个,无关紧要,最终调用了GroupChannel的messageReceived()方法,具体的下一篇文章再说