tomcat cluster 源码分析
看完tomcat的源码,迫不及待的想研究下tomcat集群,可惜,网络上对于tomcat集群的源码分析比较少(百度,google都找了,基本没有,只有api),基本上都是讲些tomcat集群的配置,以及粗浅的东西,无奈之下,硬着头皮看代码,做个笔记记录下。首先在StandardEngine(ContainerBase).start()方法中,对集群进行启动
if ((cluster != null) && (cluster instanceof Lifecycle))
((Lifecycle) cluster).start();
下面是集群启动的方法:
public void start() throws LifecycleException {
if (started)
throw new LifecycleException(sm.getString("cluster.alreadyStarted"));
if (log.isInfoEnabled()) log.info("Cluster is about to start");
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, this);
try {
checkDefaults(); // 检查默认配置
registerClusterValve();
channel.addMembershipListener(this);
channel.addChannelListener(this);
channel.start(channelStartOptions);
if (clusterDeployer != null) clusterDeployer.start();
this.started = true;
// Notify our interested LifecycleListeners
lifecycle.fireLifecycleEvent(AFTER_START_EVENT, this);
} catch (Exception x) {
log.error("Unable to start cluster.", x);
throw new LifecycleException(x);
}
}
checkDefaults();
protected void checkDefaults() {
if ( clusterListeners.size() == 0 ) {
addClusterListener(new JvmRouteSessionIDBinderListener());
addClusterListener(new ClusterSessionListener());
}
if ( valves.size() == 0 ) {
addValve(new JvmRouteBinderValve());
addValve(new ReplicationValve());
}
if ( clusterDeployer != null ) clusterDeployer.setCluster(this);
if ( channel == null ) channel = new GroupChannel();
if ( channel instanceof GroupChannel && !((GroupChannel)channel).getInterceptors().hasNext()) {
channel.addInterceptor(new MessageDispatch15Interceptor());
channel.addInterceptor(new TcpFailureDetector());
}
}
相当于:
<Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster" channelSendOptions="6">
<Manager className="org.apache.catalina.ha.session.BackupManager" expireSessionsOnShutdown="false" notifyListenersOnReplication="true" mapSendOptions="6" />
<Channel className="org.apache.catalina.tribes.group.GroupChannel">
<Membership className="org.apache.catalina.tribes.membership.McastService" address="228.0.0.4" port="45564" frequency="500" dropTime="3000" />
<Receiver className="org.apache.catalina.tribes.transport.nio.NioReceiver" address="auto" port="5000" selectorTimeout="100" maxThreads="6" />
<Sender className="org.apache.catalina.tribes.transport.ReplicationTransmitter">
<Transport className="org.apache.catalina.tribes.transport.nio.PooledParallelSender" />
</Sender>
<Interceptor className="org.apache.catalina.tribes.group.interceptors.TcpFailureDetector" />
<Interceptor className="org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor" />
<Interceptor className="org.apache.catalina.tribes.group.interceptors.ThroughputInterceptor" />
</Channel>
<Valve className="org.apache.catalina.ha.tcp.ReplicationValve" filter=".*\.gif;.*\.js;.*\.jpg;.*\.png;.*\.htm;.*\.html;.*\.css;.*\.txt;" />
<Deployer className="org.apache.catalina.ha.deploy.FarmWarDeployer" tempDir="/tmp/war-temp/" deployDir="/tmp/war-deploy/" watchDir="/tmp/war-listen/" watchEnabled="false" />
<ClusterListener className="org.apache.catalina.ha.session.ClusterSessionListener" />
</Cluster>
registerClusterValve();
protected void registerClusterValve() throws Exception {
if(container != null ) {
for (Iterator iter = valves.iterator(); iter.hasNext();) {
ClusterValve valve = (ClusterValve) iter.next();
if (log.isDebugEnabled())
log.debug("Invoking addValve on " + getContainer()
+ " with class=" + valve.getClass().getName());
if (valve != null) {
// 将value存放到Engine的values中,请求时,通过管道pipeline进行调用。
IntrospectionUtils.callMethodN(getContainer(), "addValve",
new Object[] { valve },
new Class[] { org.apache.catalina.Valve.class });
}
valve.setCluster(this);
}
}
}
接下来,看看通道的启动:
channel.start(channelStartOptions); channelStartOptions默认为15,二进制位1111.
public synchronized void start(int svc) throws ChannelException {
setupDefaultStack();
if (optionCheck) checkOptionFlags();
super.start(svc);
if ( hbthread == null && heartbeat ) {
hbthread = new HeartbeatThread(this,heartbeatSleeptime);
hbthread.start();
}
}
首先调用父类的start方法,super.start(svc);
一层一层的调用,最后调用
/**
* Starts up the channel. This can be called multiple times for individual services to start
* The svc parameter can be the logical or value of any constants
* @param svc int value of <BR>
* DEFAULT - will start all services <BR>
* MBR_RX_SEQ - starts the membership receiver <BR>
* MBR_TX_SEQ - starts the membership broadcaster <BR>
* SND_TX_SEQ - starts the replication transmitter<BR>
* SND_RX_SEQ - starts the replication receiver<BR>
* @throws ChannelException if a startup error occurs or the service is already started.
*/
protected synchronized void internalStart(int svc) throws ChannelException {
try {
boolean valid = false;
//make sure we don't pass down any flags that are unrelated to the bottom layer
svc = svc & Channel.DEFAULT;
if (startLevel == Channel.DEFAULT) return; //we have already started up all components
if (svc == 0 ) return;//nothing to start
if (svc == (svc & startLevel)) throw new ChannelException("Channel already started for level:"+svc);
//must start the receiver first so that we can coordinate the port it
//listens to with the local membership settings
if ( Channel.SND_RX_SEQ==(svc & Channel.SND_RX_SEQ) ) {
clusterReceiver.setMessageListener(this);
clusterReceiver.start();
//synchronize, big time FIXME
membershipService.setLocalMemberProperties(getClusterReceiver().getHost(), getClusterReceiver().getPort());
valid = true;
}
if ( Channel.SND_TX_SEQ==(svc & Channel.SND_TX_SEQ) ) {
clusterSender.start();
valid = true;
}
if ( Channel.MBR_RX_SEQ==(svc & Channel.MBR_RX_SEQ) ) {
membershipService.setMembershipListener(this);
membershipService.start(MembershipService.MBR_RX);
valid = true;
}
if ( Channel.MBR_TX_SEQ==(svc & Channel.MBR_TX_SEQ) ) {
membershipService.start(MembershipService.MBR_TX);
valid = true;
}
if ( !valid) {
throw new IllegalArgumentException("Invalid start level, valid levels are:SND_RX_SEQ,SND_TX_SEQ,MBR_TX_SEQ,MBR_RX_SEQ");
}
startLevel = (startLevel | svc);
}catch ( ChannelException cx ) {
throw cx;
}catch ( Exception x ) {
throw new ChannelException(x);
}
}
上面的代码,英文注释也比较清楚,我就不画蛇添足再做解释了。
接下来就是:
if ( hbthread == null && heartbeat ) {
hbthread = new HeartbeatThread(this,heartbeatSleeptime);
hbthread.start();
}
启动
super.heartbeat();
Iterator i = membershipListeners.iterator();
while ( i.hasNext() ) {
Object o = i.next();
if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
}
i = channelListeners.iterator();
while ( i.hasNext() ) {
Object o = i.next();
if ( o instanceof Heartbeat ) ((Heartbeat)o).heartbeat();
}
super.heartbeat();
super.heartbeat();
checkMembers(false);
checkMembers(boolean checkAll)
public void checkMembers(boolean checkAll) {
try {
if (membership == null) setupMembership();
synchronized (membership) {
if ( !checkAll ) performBasicCheck();
else performForcedCheck();
}
}catch ( Exception x ) {
log.warn("Unable to perform heartbeat on the TcpFailureDetector.",x);
} finally {
}
}
performBasicCheck()
protected void performBasicCheck() {
//update all alive times
Member[] members = super.getMembers();
for (int i = 0; members != null && i < members.length; i++) {
if (membership.memberAlive( (MemberImpl) members)) { // 更新当前的节点的最近心跳时间戳lastHeardFrom。
//we don't have this one in our membership, check to see if he/she is alive
if (memberAlive(members)) {
log.warn("Member added, even though we werent notified:" + members);
super.memberAdded(members);
} else {
membership.removeMember( (MemberImpl) members);
} //end if
} //end if
} //for
//check suspect members if they are still alive,
//if not, simply issue the memberDisappeared message
MemberImpl[] keys = (MemberImpl[]) removeSuspects.keySet().toArray(new MemberImpl);
for (int i = 0; i < keys.length; i++) {
MemberImpl m = (MemberImpl) keys;
if (membership.getMember(m) != null && (!memberAlive(m))) {
membership.removeMember(m);
super.memberDisappeared(m);
removeSuspects.remove(m);
if(log.isInfoEnabled())
log.info("Suspect member, confirmed dead.["+m+"]");
} //end if
}
//check add suspects members if they are alive now,
//if they are, simply issue the memberAdded message
keys = (MemberImpl[]) addSuspects.keySet().toArray(new MemberImpl);
for (int i = 0; i < keys.length; i++) {
MemberImpl m = (MemberImpl) keys;
if ( membership.getMember(m) == null && (memberAlive(m))) {
membership.memberAlive(m);
super.memberAdded(m);
addSuspects.remove(m);
if(log.isInfoEnabled())
log.info("Suspect member, confirmed alive.["+m+"]");
} //end if
}
}
比较乱。。。
启动进程
Daemon Thread
Daemon Thread
Daemon Thread
Daemon Thread
总结下:
每间隔500毫秒,讲本身的节点信息进行组播。
节点接受组播信息。
发送和接受后,都调用下面的方法:
protected void checkExpired() {
synchronized (expiredMutex) {
// timeToExpiration 默认为3秒
MemberImpl[] expired = membership.expire(timeToExpiration);
// 不发送组播报的节点就会被认为是故障,并被从cluster删去
for (int i = 0; i < expired.length; i++) {
final MemberImpl member = expired;
if (log.isDebugEnabled())
log.debug("Mcast exipremember " + expired);
try {
// 使用一个线程,将故障节点从集群中移除(移除一个节点,需要经过一系列的过程)
Thread t = new Thread() {
public void run() {
service.memberDisappeared(member);
}
};
t.start();
} catch (Exception x) {
log.error("Unable to process member disappeared message.", x);
}
}
}
}
获取超过3秒的到期时间。
/**
* Runs a refresh cycle and returns a list of members that has expired.
* This also removes the members from the membership, in such a way that
* getMembers() = getMembers() - expire()
* @param maxtime - the max time a member can remain unannounced before it is considered dead.
* @return the list of expired members
*/
public synchronized MemberImpl[] expire(long maxtime) {
if(!hasMembers() )
return EMPTY_MEMBERS;
ArrayList list = null;
Iterator i = map.values().iterator();
while(i.hasNext()) {
MbrEntry entry = (MbrEntry)i.next();
if( entry.hasExpired(maxtime) ) {
if(list == null) // only need a list when members are expired (smaller gc)
list = new java.util.ArrayList();
list.add(entry.getMember());
}
}
if(list != null) {
MemberImpl[] result = new MemberImpl;
list.toArray(result);
for( int j=0; j<result.length; j++) {
removeMember(result);
}
return result;
} else {
return EMPTY_MEMBERS ;
}
}
判断时间是否到期
/**
* Check if this dude has expired
* @param maxtime The time threshold
*/
public boolean hasExpired(long maxtime) {
long delta = System.currentTimeMillis() - lastHeardFrom;
return delta > maxtime;
}
下面是session的同步,后台Engine启动的一个线程,不间断的跑下面的job,对session进行同步:
下面是addToQueue方法:
public boolean addToQueue(ChannelMessage msg, Member[] destination, InterceptorPayload payload) {
final LinkObject obj = new LinkObject(msg,destination,payload);
Runnable r = new Runnable() {
public void run() {
sendAsyncData(obj);
}
};
executor.execute(r);
return true;
}
sendAsyncData(obj),异步发送信息:
NIO发送信息:
private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) throws IOException, ChannelException {
int completed = 0;
int selectedKeys = selector.select(selectTimeOut);
if (selectedKeys == 0) {
return 0;
}
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey sk = (SelectionKey) it.next();
it.remove();
int readyOps = sk.readyOps();
sk.interestOps(sk.interestOps() & ~readyOps);
NioSender sender = (NioSender) sk.attachment();
try {
if (sender.process(sk,waitForAck)) {
completed++;
sender.setComplete(true);
if ( Logs.MESSAGES.isTraceEnabled() ) {
Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+sender.getDestination().getName());
}
SenderState.getSenderState(sender.getDestination()).setReady();
}//end if
} catch (Exception x) {
SenderState state = SenderState.getSenderState(sender.getDestination());
int attempt = sender.getAttempt()+1;
boolean retry = (sender.getAttempt() <= maxAttempts && maxAttempts>0);
synchronized (state) {
//sk.cancel();
if (state.isSuspect()) state.setFailing();
if (state.isReady()) {
state.setSuspect();
if ( retry )
log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect and retrying.");
else
log.warn("Member send is failing for:" + sender.getDestination().getName() +" ; Setting to suspect.", x);
}
}
if ( !isConnected() ) {
log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected.");
ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x);
cx.addFaultyMember(sender.getDestination(),x);
throw cx;
}
byte[] data = sender.getMessage();
if ( retry ) {
try {
sender.disconnect();
sender.connect();
sender.setAttempt(attempt);
sender.setMessage(data);
}catch ( Exception ignore){
state.setFailing();
}
} else {
ChannelException cx = new ChannelException("Send failed, attempt:"+sender.getAttempt()+" max:"+maxAttempts,x);
cx.addFaultyMember(sender.getDestination(),x);
throw cx;
}//end if
}
}
return completed;
}
上面是在tomcat启动时,会启动的过程,下面还有个在访问的情况下,session同步,还有个步骤:
至此,tomcat的集群源码大致分析完毕。
页:
[1]