设为首页 收藏本站
查看: 280|回复: 0

[经验分享] Tomcat接收请求

[复制链接]

尚未签到

发表于 2017-1-19 07:06:55 | 显示全部楼层 |阅读模式
  1. 启动Tomcat后, 在哪里接收Request?
  启动Tomcat时,部署完webapp 后,就会启动connector ,启动这个连接器,也就意味着会启动一个线程来接收请求,
  具体涉及的类:
  Http11Protocol
  org.apach.tomcat.uitl.net ,
  JIoEndPoint , WorkStack (pool), Acceptor(Runnable) 
  首先,启动Http11Protocol会调用JIoEndpoint中的start();JIO会new监听器线程,监控连接;

public void start()
throws Exception {
// Initialize socket if not done before
if (!initialized) {
init();
}
if (!running) {
running = true;
paused = false;
// Create worker collection
if (executor == null) {
workers = new WorkerStack(maxThreads);
}
// Start acceptor threads
for (int i = 0; i < acceptorThreadCount; i++) {
Thread acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor-" + i);
acceptorThread.setPriority(threadPriority);
acceptorThread.setDaemon(daemon);
acceptorThread.start();
}
}
}
  Acceptor(实现Runnable接口) 的 run() :启动http11Protocol ,就会阻塞在acceptSocket()上,直到有连接传入,得到Socket , 传递给Processor

/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
* 监控连接,并把他们交给processor处理
*/
public void run() {
// Loop until we receive a shutdown command
// 只要不是shutdown 命令 他就一直在运行 等候请求传入
while (running) {
// Loop if endpoint is paused
while (paused) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore
}
}
// Accept the next incoming connection from the server socket
try { // 阻塞在这个地方 直到有连接传入
Socket socket = serverSocketFactory.acceptSocket(serverSocket);
serverSocketFactory.initSocket(socket);
// Hand this socket off to an appropriate processor
if (!processSocket(socket)) { // 处理socket
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}
}catch ( IOException x ) {
if ( running ) log.error(sm.getString("endpoint.accept.fail"), x);
} catch (Throwable t) {
log.error(sm.getString("endpoint.accept.fail"), t);
}
// The processor will recycle itself when it finishes
}
}
}

  processSocket() : 取得线程,处理Socket

/**
* Process given socket.
*/
protected boolean processSocket(Socket socket) {
try {
if (executor == null) { // exector是线程池
getWorkerThread().assign(socket);
} else {
executor.execute(new SocketProcessor(socket));// SocketProcess 实现了Runnable接口 ,process处理!
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

  可能你会举得到这里,如果不通过线程池处理,那么是怎么把request传递给servlet的了?
  通过分析下面的源码,好像getWorkThread().assign(socket) 并没有调用处理socket的方法?
  注意createWorkThread()中的“newWorkerThread() ” , 通过这个方法,我们会创建一个线程,并且会启动,Work类的源代码 它是实现了Runnable接口的!
  所以,总结Tomcat接收一个请求的过程 :
   Tomcat启动,会启动一个Acceptor线程来监听请求 , 请求传入了时,得到相应的Socket,然后通过workStack 或者 Executor的方式,产生一个新的线程来处理,
       在通过workStack 产生线程到处理的过程 : getWorkerThread() 调用createWorkerThread() 调用 newWorkerThread会产生一个Work thread , 并且start,但是
       因为socket = await() , avaliable = false , 会阻塞,通过调用 assign() , 使得available = true 把最新的socket传递进去, run()方法运行,处理socket , 这就是assign()的作用。
       但是 , 为什么要这么做了,直接new 出线程,然后run就是,就像executor一样!为什么这中间要添加这些额外的操作了?
         为了保证线程得到的是最新的socket!!
  newWorkerThread() :

/**
* Create and return a new processor suitable for processing HTTP
* requests and returning the corresponding responses.
*/
protected Worker newWorkerThread() {
Worker workerThread = new Worker();
workerThread.start(); //注意这里!!!!
return (workerThread);
}

  getWrokThread() : // 不通过线程池的方式取得线程,将线程放在一个workStack中

/**
* Return a new worker thread, and block while to worker is available.
*/
protected Worker getWorkerThread() {
// Allocate a new worker thread
synchronized (workers) { // 注意同步
Worker workerThread;
while ((workerThread = createWorkerThread()) == null) {
try {
workers.wait(); // 如果不能取的线程,wait,因为你在server.xml中配置了maxThread;
} catch (InterruptedException e) {
// Ignore
}
}
return workerThread;
}
}

  Work内部类: // 一个处理rquest的线程就是一个worker!
  里面两个特别重要的方法,assign()  和 await() 都是同步
   assgin 传递最新的socket!
     而 await 则是在等候最新的socket!

protected class Worker implements Runnable {
protected Thread thread = null;
protected boolean available = false;
protected Socket socket = null;
// 将availabe 置为 true , 并唤醒所有等待的线程我知道了,assign的作用就是把最新的socket传递进来
synchronized void assign(Socket socket) {
// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Store the newly available Socket and notify our thread
this.socket = socket;
available = true;
notifyAll();
}
// 等候最新的socket
private synchronized Socket await() {
// Wait for the Connector to provide a new Socket
while (!available) {
try {
wait();
} catch (InterruptedException e) {
}
}
// Notify the Connector that we have received this Socket
Socket socket = this.socket;
available = false;  // 将available置为false!
notifyAll();
return (socket);
}
// run方法
public void run() {
// Process requests until we receive a shutdown signal
while (running) {
// Wait for the next socket to be assigned
Socket socket = await(); // 调用这个方法时,除非调用assign()方法使的available为true,否则就会一直等待!而且现在锁已经加上来了,其他线程是不能够进来了!
if (socket == null) //
continue;
// Process the request from this socket
if (!setSocketOptions(socket) || !handler.process(socket)) { // 这两个方法跟线程池处理的一样
// Close socket
try {
socket.close();
} catch (IOException e) {
}
}
// Finish up this request
socket = null;
recycleWorkerThread(this);
}
}
public void start() {
thread = new Thread(this);
thread.setName(getName() + "-" + (++curThreads));
thread.setDaemon(true);
thread.start();
}
}

  createWorkThread() ; 创建一个新的线程,调用newWorkerThread() 启动线程

/**
* Create (or allocate) and return an available processor for use in
* processing a specific HTTP request, if possible.  If the maximum
* allowed processors have already been created and are in use, return
* <code>null</code> instead.
*/
protected Worker createWorkerThread() {
synchronized (workers) {
if (workers.size() > 0) { // 线程栈,可用的线程
curThreadsBusy++;
return workers.pop();
}
if ((maxThreads > 0) && (curThreads < maxThreads)) {
curThreadsBusy++;
if (curThreadsBusy == maxThreads) {
log.info(sm.getString("endpoint.info.maxThreads",
Integer.toString(maxThreads), address,
Integer.toString(port)));
}
return (newWorkerThread()); // 注意这个方法!!
} else {
if (maxThreads < 0) {
curThreadsBusy++;
return (newWorkerThread());
} else {
return (null);
}
}
}
}
  SocketProcess的Run() :// 处理socket ,

public void run() {
// Process the request from this socket
if (!setSocketOptions(socket) || !handler.process(socket)) { // handler 处理socket 通过这个方法就会将request传递给servlet
// Close socket
try {
socket.close();
} catch (IOException e) {
}
}
// Finish up this request
socket = null;
}
  2. 接收请求后,是怎么传递给相应的servlet处理的?
  通过调用org.apache.coyote.http11.http11Protocol.process(socket) 方法!!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-330344-1-1.html 上篇帖子: tomcat的comet 下篇帖子: Tomcat
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表