xuol001 发表于 2017-1-25 13:20:03

tomcat源码解析--SessionManager

  Manager 是tomcat 用于管理session的操作的定义,ManagerBase 设置默认参数,实现一些公用方法
  StandardManager 是真正tomcat用来处理session的管理器,它实现了lifecycle接口,具体的关系如下
  

 其中clusterManager是定义集群环境下session管理的操作,clusterManagerBase继承了ManagerBase,实现了ClusterManager,是真正集群环境中使用的session管理器
  Manager中的操作:
  

 

  每个managerbase都会与一个context绑定,初始化init方法

if( initialized ) return;
initialized=true;      
log = LogFactory.getLog(ManagerBase.class);
if( oname==null ) {
try {
StandardContext ctx=(StandardContext)this.getContainer();
Engine eng=(Engine)ctx.getParent().getParent();
domain=ctx.getEngineName();
distributable = ctx.getDistributable();
StandardHost hst=(StandardHost)ctx.getParent();
String path = ctx.getPath();
if (path.equals("")) {
path = "/";
}   
oname=new ObjectName(domain + ":type=Manager,path="
+ path + ",host=" + hst.getName());
Registry.getRegistry(null, null).registerComponent(this, oname, null );
} catch (Exception e) {
log.error("Error registering ",e);
}
}
// Initialize random number generation
getRandomBytes(new byte);
// Ensure caches for timing stats are the right size by filling with
// nulls.
while (sessionCreationTiming.size() < TIMING_STATS_CACHE_SIZE) {
sessionCreationTiming.add(null);
}
while (sessionExpirationTiming.size() < TIMING_STATS_CACHE_SIZE) {
sessionExpirationTiming.add(null);
}
if(log.isDebugEnabled())
log.debug("Registering " + oname );
  
 ManagerBase 中定义了用于生成session的random 值的方法(值得参考)
  (entropy) << ((i % 8) * 8)

public Random getRandom() {
if (this.random == null) {
// Calculate the new random number generator seed
long seed = System.currentTimeMillis();
long t1 = seed;
char entropy[] = getEntropy().toCharArray();
for (int i = 0; i < entropy.length; i++) {
//这里提供了一个 char 字符 变成long的方法, entropy) << ((i % 8) * 8
long update = ((long) entropy) << ((i % 8) * 8);
seed ^= update;
}
try {
// Construct and seed a new random number generator
Class clazz = Class.forName(randomClass);
this.random = (Random) clazz.newInstance();
this.random.setSeed(seed);
} catch (Exception e) {
// Fall back to the simple case
log.error(sm.getString("managerBase.random", randomClass),
e);
this.random = new java.util.Random();
this.random.setSeed(seed);
}
if(log.isDebugEnabled()) {
long t2=System.currentTimeMillis();
if( (t2-t1) > 100 )
log.debug(sm.getString("managerBase.seeding", randomClass) + " " + (t2-t1));
}
}
return (this.random);
  调用得到32bit 随机字符的方法
  注意,这里调用了org.apache.tomcat.jni.OS的静态方法public static native int random(byte [] buf, int len);来得到随机数组,native关键字是指tomcat在这里调用了外部的非java方法来实现这个调用

public String getEntropy() {
// Calculate a semi-useful value if this has not been set
if (this.entropy == null) {
// Use APR to get a crypto secure entropy value
byte[] result = new byte;
boolean apr = false;
try {
String methodName = "random";
Class paramTypes[] = new Class;
paramTypes = result.getClass();
paramTypes = int.class;
Object paramValues[] = new Object;
paramValues = result;
paramValues = new Integer(32);
Method method = Class.forName("org.apache.tomcat.jni.OS")
.getMethod(methodName, paramTypes);
method.invoke(null, paramValues);
apr = true;
} catch (Throwable t) {
// Ignore
}
if (apr) {
try {
setEntropy(new String(result, "ISO-8859-1"));
} catch (UnsupportedEncodingException ux) {
// ISO-8859-1 should always be supported
throw new Error(ux);
}
} else {
setEntropy(this.toString());
}
}
return (this.entropy);
}
  sessionID的生成方法

protected synchronized String generateSessionId() {
byte random[] = new byte;
String jvmRoute = getJvmRoute();
String result = null;
// Render the result as a String of hexadecimal digits
StringBuffer buffer = new StringBuffer();
do {
int resultLenBytes = 0;
if (result != null) {
buffer = new StringBuffer();
duplicates++;
}
while (resultLenBytes < this.sessionIdLength) {
//生成随机数组
getRandomBytes(random);
//对数组进行加密,默认MD5
random = getDigest().digest(random);
for (int j = 0;
j < random.length && resultLenBytes < this.sessionIdLength;
j++) {
//涉及到数学,不是很懂为啥??
byte b1 = (byte) ((random & 0xf0) >> 4);
byte b2 = (byte) (random & 0x0f);
if (b1 < 10)
buffer.append((char) ('0' + b1));
else
buffer.append((char) ('A' + (b1 - 10)));
if (b2 < 10)
buffer.append((char) ('0' + b2));
else
buffer.append((char) ('A' + (b2 - 10)));
resultLenBytes++;
}
}
if (jvmRoute != null) {
//其中jvmRoute是为了防止tomcat集群导致的sessionId冲突
buffer.append('.').append(jvmRoute);
}
result = buffer.toString();
} while (sessions.containsKey(result));
return (result);
  StandardManager 生命周期:
  start()

if( ! initialized )
//managerbase.init
init();
// Validate and update our current component state
if (started) {
return;
}
lifecycle.fireLifecycleEvent(START_EVENT, null);
started = true;
// Force initialization of the random number generator
if (log.isDebugEnabled())
log.debug("Force random number initialization starting");
String dummy = generateSessionId();
if (log.isDebugEnabled())
log.debug("Force random number initialization completed");
// Load unloaded sessions, if any
try {
load();
} catch (Throwable t) {
log.error(sm.getString("standardManager.managerLoad"), t);
}
  load

if (SecurityUtil.isPackageProtectionEnabled()){
try{
AccessController.doPrivileged( new PrivilegedDoLoad() );
} catch (PrivilegedActionException ex){
Exception exception = ex.getException();
if (exception instanceof ClassNotFoundException){
throw (ClassNotFoundException)exception;
} else if (exception instanceof IOException){
throw (IOException)exception;
}
if (log.isDebugEnabled())
log.debug("Unreported exception in load() "
+ exception);
}
} else {
doLoad();
}
  doload方法

if (log.isDebugEnabled())
log.debug("Start: Loading persisted sessions");
// Initialize our internal data structures
sessions.clear();
// Open an input stream to the specified pathname, if any
//根据pathname得到 session文件
File file = file();
if (file == null)
return;
if (log.isDebugEnabled())
log.debug(sm.getString("standardManager.loading", pathname));
FileInputStream fis = null;
ObjectInputStream ois = null;
Loader loader = null;
ClassLoader classLoader = null;
try {
fis = new FileInputStream(file.getAbsolutePath());
BufferedInputStream bis = new BufferedInputStream(fis);
if (container != null)
loader = container.getLoader();
if (loader != null)
classLoader = loader.getClassLoader();
if (classLoader != null) {
if (log.isDebugEnabled())
log.debug("Creating custom object input stream for class loader ");
ois = new CustomObjectInputStream(bis, classLoader);
} else {
if (log.isDebugEnabled())
log.debug("Creating standard object input stream");
ois = new ObjectInputStream(bis);
}
} catch (FileNotFoundException e) {
if (log.isDebugEnabled())
log.debug("No persisted data file found");
return;
} catch (IOException e) {
log.error(sm.getString("standardManager.loading.ioe", e), e);
if (ois != null) {
try {
ois.close();
} catch (IOException f) {
;
}
ois = null;
}
throw e;
}
// Load the previously unloaded active sessions
synchronized (sessions) {
try {
Integer count = (Integer) ois.readObject();
int n = count.intValue();
if (log.isDebugEnabled())
log.debug("Loading " + n + " persisted sessions");
for (int i = 0; i < n; i++) {
//初始化session
StandardSession session = getNewSession();
session.readObjectData(ois);
session.setManager(this);
sessions.put(session.getIdInternal(), session);
session.activate();
if (!session.isValidInternal()) {
// If session is already invalid,
// expire session to prevent memory leak.
session.setValid(true);
session.expire();
}
sessionCounter++;
}
} catch (ClassNotFoundException e) {
log.error(sm.getString("standardManager.loading.cnfe", e), e);
if (ois != null) {
try {
ois.close();
} catch (IOException f) {
;
}
ois = null;
}
throw e;
} catch (IOException e) {
log.error(sm.getString("standardManager.loading.ioe", e), e);
if (ois != null) {
try {
ois.close();
} catch (IOException f) {
;
}
ois = null;
}
throw e;
} finally {
// Close the input stream
try {
if (ois != null)
ois.close();
} catch (IOException f) {
// ignored
}
// Delete the persistent storage file
if (file != null && file.exists() )
file.delete();
}
}
if (log.isDebugEnabled())
log.debug("Finish: Loading persisted sessions");
  stop 方法 
  先unload,对session 钝化 发出事件通知,写入文件
  然后session过期
  最后销毁

// Write out sessions
try {
unload();
} catch (Throwable t) {
log.error(sm.getString("standardManager.managerUnload"), t);
}
// Expire all active sessions
Session sessions[] = findSessions();
for (int i = 0; i < sessions.length; i++) {
Session session = sessions;
try {
if (session.isValid()) {
session.expire();
}
} catch (Throwable t) {
;
} finally {
// Measure against memory leaking if references to the session
// object are kept in a shared field somewhere
session.recycle();
}
}
// Require a new random number generator if we are restarted
this.random = null;
if( initialized ) {
destroy();
}
  dounload(), 

if (log.isDebugEnabled())
log.debug(sm.getString("standardManager.unloading.debug"));
if (sessions.isEmpty()) {
log.debug(sm.getString("standardManager.unloading.nosessions"));
return; // nothing to do
}
// Open an output stream to the specified pathname, if any
File file = file();
if (file == null)
return;
if (log.isDebugEnabled())
log.debug(sm.getString("standardManager.unloading", pathname));
FileOutputStream fos = null;
ObjectOutputStream oos = null;
try {
fos = new FileOutputStream(file.getAbsolutePath());
oos = new ObjectOutputStream(new BufferedOutputStream(fos));
} catch (IOException e) {
log.error(sm.getString("standardManager.unloading.ioe", e), e);
if (oos != null) {
try {
oos.close();
} catch (IOException f) {
;
}
oos = null;
}
throw e;
}
// Write the number of active sessions, followed by the details
ArrayList list = new ArrayList();
synchronized (sessions) {
if (log.isDebugEnabled())
log.debug("Unloading " + sessions.size() + " sessions");
try {
oos.writeObject(new Integer(sessions.size()));
Iterator elements = sessions.values().iterator();
while (elements.hasNext()) {
StandardSession session =
(StandardSession) elements.next();
list.add(session);
((StandardSession) session).passivate();
session.writeObjectData(oos);
}
} catch (IOException e) {
log.error(sm.getString("standardManager.unloading.ioe", e), e);
if (oos != null) {
try {
oos.close();
} catch (IOException f) {
;
}
oos = null;
}
throw e;
}
}
// Flush and close the output stream
try {
oos.flush();
oos.close();
oos = null;
} catch (IOException e) {
if (oos != null) {
try {
oos.close();
} catch (IOException f) {
;
}
oos = null;
}
throw e;
}
// Expire all the sessions we just wrote
if (log.isDebugEnabled())
log.debug("Expiring " + list.size() + " persisted sessions");
Iterator expires = list.iterator();
while (expires.hasNext()) {
StandardSession session = (StandardSession) expires.next();
try {
session.expire(false);
} catch (Throwable t) {
;
} finally {
session.recycle();
}
}
if (log.isDebugEnabled())
log.debug("Unloading complete");
  destory()

if( oname != null )
Registry.getRegistry(null, null).unregisterComponent(oname);
if (randomIS!=null) {
try {
randomIS.close();
} catch (IOException ioe) {
log.warn("Failed to close randomIS.");
}
randomIS=null;
}
initialized=false;
oname = null;
  ClusterManger 接口方法
  

 包含一些集群获取,消息通知,处理等方法
  DeltaManager 继承了 ClusterManagerBase 方法,是集群的session管理器
  创建session

public Session createSession(String sessionId, boolean distribute) {
if ((maxActiveSessions >= 0) && (sessions.size() >= maxActiveSessions)) {
rejectedSessions++;
throw new TooManyActiveSessionsException(
sm.getString("deltaManager.createSession.ise"),
maxActiveSessions);
}
DeltaSession session = (DeltaSession) super.createSession(sessionId) ;
if (distribute) {
sendCreateSession(session.getId(), session);
}
if (log.isDebugEnabled())
log.debug(sm.getString("deltaManager.createSession.newSession",session.getId(), new Integer(sessions.size())));
return (session);
}
  发送消息

protected void sendCreateSession(String sessionId, DeltaSession session) {
if(cluster.getMembers().length > 0 ) {
SessionMessage msg =
new SessionMessageImpl(getName(),
SessionMessage.EVT_SESSION_CREATED,
null,
sessionId,
sessionId + "-" + System.currentTimeMillis());
if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId));
msg.setTimestamp(session.getCreationTime());
counterSend_EVT_SESSION_CREATED++;
send(msg);
}
}

protected void send(SessionMessage msg) {
if(cluster != null) {
if(doDomainReplication())
cluster.sendClusterDomain(msg);
else
cluster.send(msg);
}
}
  sessionID 序列化操作

protected byte[] serializeSessionId(String sessionId) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeUTF(sessionId);
oos.flush();
oos.close();
return bos.toByteArray();
}
/**
* Load sessionID
* @throws IOException if an input/output error occurs
*/
protected String deserializeSessionId(byte[] data) throws IOException {
ReplicationStream ois = getReplicationStream(data);
String sessionId = ois.readUTF();
ois.close();
return sessionId;
}
  变更sessionID

public void changeSessionId(Session session, boolean notify) {
// original sessionID
String orgSessionID = session.getId();
super.changeSessionId(session);
public void changeSessionId(Session session) {
String oldId = session.getIdInternal();
session.setId(generateSessionId(), false);
String newId = session.getIdInternal();
if (container instanceof ContainerBase) {
((ContainerBase)container).fireContainerEvent(
Context.CHANGE_SESSION_ID_EVENT,
new String[] {oldId, newId});
}
}
  if (notify) { // changed sessionID String newSessionID = session.getId(); try { // serialize sessionID byte[] data = serializeSessionId(newSessionID); // notify change sessionID SessionMessage msg = new SessionMessageImpl(getName(), SessionMessage.EVT_CHANGE_SESSION_ID, data, orgSessionID, orgSessionID + "-" + System.currentTimeMillis()); msg.setTimestamp(System.currentTimeMillis()); counterSend_EVT_CHANGE_SESSION_ID++; send(msg); } catch (IOException e) { log.error(sm.getString("deltaManager.unableSerializeSessionID", newSessionID), e); } } }
  start方法

if (!initialized) init();
// Validate and update our current component state
if (started) {
return;
}
started = true;
lifecycle.fireLifecycleEvent(START_EVENT, null);
// Force initialization of the random number generator
generateSessionId();
// Load unloaded sessions, if any
try {
//the channel is already running
Cluster cluster = getCluster() ;
// stop remove cluster binding
//wow, how many nested levels of if statements can we have ;)
if(cluster == null) {
Container context = getContainer() ;
if(context != null && context instanceof Context) {
//集群环境下 必须有Host 或者 Engine 容器
Container host = context.getParent() ;
if(host != null && host instanceof Host) {
cluster = host.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) cluster) ;
} else {
Container engine = host.getParent() ;
if(engine != null && engine instanceof Engine) {
cluster = engine.getCluster();
if(cluster != null && cluster instanceof CatalinaCluster) {
setCluster((CatalinaCluster) cluster) ;
}
} else {
cluster = null ;
}
}
}
}
}
if (cluster == null) {
log.error(sm.getString("deltaManager.noCluster", getName()));
return;
} else {
if (log.isInfoEnabled()) {
String type = "unknown" ;
if( cluster.getContainer() instanceof Host){
type = "Host" ;
} else if( cluster.getContainer() instanceof Engine){
type = "Engine" ;
}
log.info(sm.getString("deltaManager.registerCluster", getName(), type, cluster.getClusterName()));
}
}
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.startClustering", getName()));
//to survice context reloads, as only a stop/start is called, not
// createManager
//在cluster中注册此session管理器
cluster.registerManager(this);
//得到其他context的session
getAllClusterSessions();
} catch (Throwable t) {
log.error(sm.getString("deltaManager.managerLoad"), t);
}
  getAllClusterSessions

if (cluster != null && cluster.getMembers().length > 0) {
long beforeSendTime = System.currentTimeMillis();
Member mbr = findSessionMasterMember();
if(mbr == null) { // No domain member found
return;
}
SessionMessage msg = new SessionMessageImpl(this.getName(),SessionMessage.EVT_GET_ALL_SESSIONS, null, "GET-ALL","GET-ALL-" + getName());
// set reference time
stateTransferCreateSendTime = beforeSendTime ;
// request session state
counterSend_EVT_GET_ALL_SESSIONS++;
stateTransfered = false ;
// FIXME This send call block the deploy thread, when sender waitForAck is enabled
try {
synchronized(receivedMessageQueue) {
receiverQueue = true ;
}
cluster.send(msg, mbr);
if (log.isWarnEnabled()) log.warn(sm.getString("deltaManager.waitForSessionState",getName(), mbr,getStateTransferTimeout()));
// FIXME At sender ack mode this method check only the state transfer and resend is a problem!
waitForSendAllSessions(beforeSendTime);
} finally {
synchronized(receivedMessageQueue) {
for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) {
SessionMessage smsg = (SessionMessage) iter.next();
if (!stateTimestampDrop) {
messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
} else {
if (smsg.getEventType() != SessionMessage.EVT_GET_ALL_SESSIONS && smsg.getTimestamp() >= stateTransferCreateSendTime) {
// FIXME handle EVT_GET_ALL_SESSIONS later
messageReceived(smsg,smsg.getAddress() != null ? (Member) smsg.getAddress() : null);
} else {
if (log.isWarnEnabled()) {
log.warn(sm.getString("deltaManager.dropMessage",getName(), smsg.getEventTypeString(),new Date(stateTransferCreateSendTime), new Date(smsg.getTimestamp())));
}
}
}
}      
receivedMessageQueue.clear();
receiverQueue = false ;
}
}
} else {
if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.noMembers", getName()));
}
  整个集群环境下的session管理,可以总结为,集群环境下的context都使用DeltaManager作为管理器,管理DeltaSession, 同时不同cluster之间使用 DeltaRequest 封装 SessionMessage来传递消息
  然后ClusterSessionListener 通过在 ChannelListener中的messageReceived 事件来 触发DeltaManager中的messageReceived事件
  具体见图:
  

 
页: [1]
查看完整版本: tomcat源码解析--SessionManager