设为首页 收藏本站
查看: 602|回复: 0

[经验分享] tomcat session复制(二)

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2017-1-26 07:47:17 | 显示全部楼层 |阅读模式
  
    上文http://nod0620.iteye.com/admin/blogs/1030398
写了session复制的发送部分,继续接收部分:
  当接收方tomcat接收到需要session的消息时,最终调用了GroupChannel的messageReceived()方法

    public void messageReceived(ChannelMessage msg) {
if ( msg == null ) return;
try {
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel - Received msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " from "+msg.getAddress().getName());
}
Serializable fwd = null;
if ( (msg.getOptions() & SEND_OPTIONS_BYTE_MESSAGE) == SEND_OPTIONS_BYTE_MESSAGE ) {
fwd = new ByteMessage(msg.getMessage().getBytes());
} else {
try {
fwd = XByteBuffer.deserialize(msg.getMessage().getBytesDirect(), 0, msg.getMessage().getLength());
}catch (Exception sx) {
log.error("Unable to deserialize message:"+msg,sx);
return;
}
}
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel - Receive Message:" + new UniqueId(msg.getUniqueId()) + " is " +fwd);
}
//get the actual member with the correct alive time
Member source = msg.getAddress();
boolean rx = false;
boolean delivered = false;
for ( int i=0; i<channelListeners.size(); i++ ) {
ChannelListener channelListener = (ChannelListener)channelListeners.get(i);
if (channelListener != null && channelListener.accept(fwd, source)) {
channelListener.messageReceived(fwd, source);
delivered = true;
//if the message was accepted by an RPC channel, that channel
//is responsible for returning the reply, otherwise we send an absence reply
if ( channelListener instanceof RpcChannel ) rx = true;
}
}//for
if ((!rx) && (fwd instanceof RpcMessage)) {
//if we have a message that requires a response,
//but none was given, send back an immediate one
sendNoRpcChannelReply((RpcMessage)fwd,source);
}
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("GroupChannel delivered["+delivered+"] id:"+new UniqueId(msg.getUniqueId()));
}
} catch ( Exception x ) {
//this could be the channel listener throwing an exception, we should log it
//as a warning.
if ( log.isWarnEnabled() ) log.warn("Error receiving message:",x);
throw new RemoteProcessException("Exception:"+x.getMessage(),x);
}
}
  可以看到,这里主要是调用ChannelListener的messageReceived方法,显然这个实现类就是SimpleTcpCluster:

public void messageReceived(Serializable message, Member sender) {
ClusterMessage fwd = (ClusterMessage)message;
fwd.setAddress(sender);
messageReceived(fwd);
}
public void messageReceived(ClusterMessage message) {
long start = 0;
if (log.isDebugEnabled() && message != null)
log.debug("Assuming clocks are synched: Replication for "
+ message.getUniqueId() + " took="
+ (System.currentTimeMillis() - (message).getTimestamp())
+ " ms.");
//invoke all the listeners
boolean accepted = false;
if (message != null) {
for (Iterator iter = clusterListeners.iterator(); iter.hasNext();) {
ClusterListener listener = (ClusterListener) iter.next();
if (listener.accept(message)) {
accepted = true;
listener.messageReceived(message);
}
}
}
if (!accepted && log.isDebugEnabled()) {
if (notifyLifecycleListenerOnFailure) {
Member dest = message.getAddress();
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(RECEIVE_MESSAGE_FAILURE_EVENT,
new SendMessageData(message, dest, null));
}
log.debug("Message " + message.toString() + " from type "
+ message.getClass().getName()
+ " transfered but no listener registered");
}
return;
}
  其实这里调用的是ClusterListener的messageReceived()方法,默认的两个ClusterListener是ClusterSessionListener和JvmRouteSessionIDBinderListener,JvmRouteSessionIDBinderListener只有在发送的消息是关于session  id改变的消息时才起作用,重点看ClusterSessionListener.messageReceived()方法,其实是调用到了DeltaManager的messageDataReceived()方法:

    public void messageDataReceived(ClusterMessage cmsg) {
if (cmsg != null && cmsg instanceof SessionMessage) {
SessionMessage msg = (SessionMessage) cmsg;
switch (msg.getEventType()) {
case SessionMessage.EVT_GET_ALL_SESSIONS:
case SessionMessage.EVT_SESSION_CREATED:
case SessionMessage.EVT_SESSION_EXPIRED:
case SessionMessage.EVT_SESSION_ACCESSED:
case SessionMessage.EVT_SESSION_DELTA:
case SessionMessage.EVT_CHANGE_SESSION_ID: {
synchronized(receivedMessageQueue) {
if(receiverQueue) {
receivedMessageQueue.add(msg);
return ;
}
}
break;
}
default: {
//we didn't queue, do nothing
break;
}
} //switch
messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null);
}
}
   这里的事件是EVT_GET_ALL_SESSIONS,在messageReceived()方法中有这个分支的处理代码,最后面调用
  handleGET_ALL_SESSIONS()方法:

protected void handleGET_ALL_SESSIONS(SessionMessage msg, Member sender) throws IOException {
counterReceive_EVT_GET_ALL_SESSIONS++;
//get a list of all the session from this manager
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.unloadingBegin", getName()));
// Write the number of active sessions, followed by the details
// get all sessions and serialize without sync
Session[] currentSessions = findSessions();
long findSessionTimestamp = System.currentTimeMillis() ;
if (isSendAllSessions()) {
sendSessions(sender, currentSessions, findSessionTimestamp);
} else {
// send session at blocks
for (int i = 0; i < currentSessions.length; i += getSendAllSessionsSize()) {
int len = i + getSendAllSessionsSize() > currentSessions.length ? currentSessions.length - i : getSendAllSessionsSize();
Session[] sendSessions = new Session[len];
System.arraycopy(currentSessions, i, sendSessions, 0, len);
sendSessions(sender, sendSessions,findSessionTimestamp);
if (getSendAllSessionsWaitTime() > 0) {
try {
Thread.sleep(getSendAllSessionsWaitTime());
} catch (Exception sleep) {
}
}//end if
}//for
}//end if
SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName());
newmsg.setTimestamp(findSessionTimestamp);
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName()));
counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE++;
cluster.send(newmsg, sender);
}

  主要是先找到所有的session,然后发送所有的session,然后发送一个标志结束的消息表示发送完成。这样第一次tomcat的启动加入集群是拿所用的session就完成了,主要的流程是这样的:
  发送方:
  StandardContext.start()--->SimpleTcpCluster.createManager()--->StandardContext.setManager()--->
  DeltaManager.start()--->DeltaManager.getAllClusterSessions()--->SimpleTcpCluster.send()
  ---->GroupChannel.send()---->ParallelNioSender.sendMessage()--->NioSender.process()
  接收方:
  NioReceiver.listen()--->NioReceiver.readDataFromSocket()---->NioReplicationTask.drainChannel()--->
  ListenCallback.messageDataReceived()---->ChannelCoordinator.messageReceived()--->
  GroupChannel.messageReceived()--->SimpleTcpCluster.messageReceived()--->
  DeltaManager.messageDataReceived()---->DeltaManager.handleGET_ALL_SESSIONS()
  接收方准备好数据又要发送给发送方,简单来说通信是这样:
  发送方发送请求--->接收方接受请求,发送数据,一个是session的数据,一个是session发送完毕的数据--->
  发送方收到接收方过来的两种数据

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-333458-1-1.html 上篇帖子: eclipse,tomcat,myeclipse内存设置 下篇帖子: Tomcat java 定时任务
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表