设为首页 收藏本站
查看: 332|回复: 0

[经验分享] Tomcat源码之JIoEndpoint

[复制链接]

尚未签到

发表于 2017-1-26 09:11:09 | 显示全部楼层 |阅读模式
Tomcat中提供了多种处理Socket的实现:JIoEndpoint、AprEndpoint和NioEndpoint。其中JIoEndpoint是最常见的一种实现方式。
JIOEndpoint中的线程有3部分:Socket侦听线程、监控线程和Executor。
Socket侦听线程
该线程由内部类Acceptor实现其Runnable接口,使用JDK的ServerSocket类监听某个端口,当socket连接进来时,构建一个task,交给Executor来处理。
监控线程
该线程由AsyncTimeout实现其Runnable接口,不断遍历Request队列,为Timeout的Request构造一个TIMEOUT task,交给Executor来处理。
Executor
Executor负责提供工作线程用来处理侦听线程和监控线程构造的task。该task会完成对Request回应。如果用户没有指定Executor,JIoEndpoint中会自动创建org.apache.tomcat.util.threads.ThreadPoolExecutor作为默认实现。

按照工作流程的顺序介绍一下各个部分:
初始化:bind()
该方法在父类AbstractEndpoint中的init()被调用,完成对ServerSocketFactory和ServerSocket初始化。
@Override
    public void bind() throws Exception {
        // Initialize thread count defaults for acceptor
        if (acceptorThreadCount == 0) {
            acceptorThreadCount = 1;
        }
        if (serverSocketFactory == null) {
            if (isSSLEnabled()) {
                ServerSocketFactory = handler.getSslImplementation().getServerSocketFactory(this);
            } else {
                ServerSocketFactory = new DefaultServerSocketFactory(this);
            }
        }
        if (serverSocket == null) {
            try {
                if (getAddress() == null) {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog());
                } else {
                    serverSocket = serverSocketFactory.createSocket(getPort(),
                            getBacklog(), getAddress());
                }
            } catch (BindException orig) {
                String msg;
                if (getAddress() == null)
                    msg = orig.getMessage() + " <null>:" + getPort();
                else
                    msg = orig.getMessage() + " " +
                            getAddress().toString() + ":" + getPort();
                BindException be = new BindException(msg);
                be.initCause(orig);
                throw be;
            }
        }
    }
启动:startInternal()
该方法在父类AbstractEndpoint中的start()被调用,生成线程资源池Executor、启动侦听线程(acceptor)和监控线程(async timeout)。
@Override
    public void startInternal() throws Exception {
        if (!running) {
            running = true;
            paused = false;
            // Create worker collection
            if (getExecutor() == null) {
                createExecutor();
            }
           
            initializeConnectionLatch();
            // Start acceptor threads
            for (int i = 0; i < acceptorThreadCount; i++) {
                Thread acceptorThread = new Thread(new Acceptor(),
                        getName() + "-Acceptor-" + i);
                acceptorThread.setPriority(threadPriority);
                acceptorThread.setDaemon(getDaemon());
                acceptorThread.start();
            }
           
            // Start async timeout thread
            Thread timeoutThread = new Thread(new AsyncTimeout(),
                    getName() + "-AsyncTimeout");
            timeoutThread.setPriority(threadPriority);
            timeoutThread.setDaemon(true);
            timeoutThread.start();
        }
    }

至此,JIoEndpoint初始化完成,并完成启动,等待用户发起连接并进行处理。一个用户连接从侦听到处理完成主要经历以下几个Runnable接口:Acceptor、AsyncTimeout和SocketProcessor。

Acceptor内部类
Acceptor实现了Runnable接口,由侦听调用。其主要的功能就是通过ServerSocket.accept方法,得到socket,然后调用JIOEndpoint的processSocket方法来处理。processSocket属于异步调用,Acceptor会迅速返回,然后继续在accept处等待新的socket。在accept前,用awaitConnection()来保证连接数不超过最大处理数,否则线程会wait,直到有连接被处理完。acceptSocket()线程安全,所以在此不用做synchronized处理。
    protected class Acceptor implements Runnable {

      
        @Override
        public void run() {
            int errorDelay = 0;
            // Loop until we receive a shutdown command
            while (running) {
                // Loop if endpoint is paused
                while (paused && running) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
                if (!running) {
                    break;
                }
                try {
                    //if we have reached max connections, wait
                    awaitConnection();
                    Socket socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSocketFactory.acceptSocket(serverSocket);
                    } catch (IOException ioe) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;
                    // Configure the socket
                    if (setSocketOptions(socket)) {
                        // Hand this socket off to an appropriate processor
                        if (!processSocket(socket)) {
                            // Close socket right away
                            try {
                                socket.close();
                            } catch (IOException e) {
                                // Ignore
                            }
                        } else {
                            countUpConnection();
                        }
                    } else {
                        // Close socket right away
                        try {
                            socket.close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    }
                } catch (IOException x) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), x);
                    }
                } catch (NullPointerException npe) {
                    if (running) {
                        log.error(sm.getString("endpoint.accept.fail"), npe);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
                // The processor will recycle itself when it finishes
            }
        }
    }

在run()方法中使用JIoEndpoint中的processSocket(),该方法主要是产生1个SocketProcessor交给Exector来处理。
    protected boolean processSocket(Socket socket) {
        // Process the request from this socket
        try {
            SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
            wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
            // During shutdown, executor may be null - avoid NPE
            if (!running) {
                return false;
            }
            getExecutor().execute(new SocketProcessor(wrapper));
        } catch (RejectedExecutionException x) {
            log.warn("Socket processing request was rejected for:"+socket,x);
            return false;
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            // This means we got an OOM or similar creating a thread, or that
            // the pool and its queue are full
            log.error(sm.getString("endpoint.process.fail"), t);
            return false;
        }
        return true;
    }
SocketProcessor内部类
该class同样只有一个run方法,把request最终交给handler,真正的处理就交给coyote了。

    protected class SocketProcessor implements Runnable {
      
        protected SocketWrapper<Socket> socket = null;
        protected SocketStatus status = null;
      
        public SocketProcessor(SocketWrapper<Socket> socket) {
            if (socket==null) throw new NullPointerException();
            this.socket = socket;
        }
        public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
            this(socket);
            this.status = status;
        }
        @Override
        public void run() {
            boolean launch = false;
            synchronized (socket) {
                try {
                    SocketState state = SocketState.OPEN;
                    try {
                        // SSL handshake
                        serverSocketFactory.handshake(socket.getSocket());
                    } catch (Throwable t) {
                        ExceptionUtils.handleThrowable(t);
                        if (log.isDebugEnabled()) {
                            log.debug(sm.getString("endpoint.err.handshake"), t);
                        }
                        // Tell to close the socket
                        state = SocketState.CLOSED;
                    }
                       
                    if ( (state != SocketState.CLOSED) ) {
                        state = (status==null)?handler.process(socket):handler.process(socket,status);
                    }
                    if (state == SocketState.CLOSED) {
                        // Close socket
                        if (log.isTraceEnabled()) {
                            log.trace("Closing socket:"+socket);
                        }
                        countDownConnection();
                        try {
                            socket.getSocket().close();
                        } catch (IOException e) {
                            // Ignore
                        }
                    } else if (state == SocketState.ASYNC_END ||
                            state == SocketState.OPEN){
                        socket.setKeptAlive(true);
                        socket.access();
                        launch = true;
                    } else if (state == SocketState.LONG) {
                        socket.access();
                        waitingRequests.add(socket);
                    }
                } finally {
                    if (launch) {
                        try {
                            getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN));
                        } catch (NullPointerException npe) {
                            if (running) {
                                log.error(sm.getString("endpoint.launch.fail"),
                                        npe);
                            }
                        }
                    }
                }
            }
            socket = null;
            // Finish up this request
        }
      
    }
至此,一个正常的request处理流程就完成了。JIoEndpoint中还有一个Runnable内部类,该类主要是找出当然任务队列中已经超时的request,加上timeout状态标记,交给Exector处理。
AsyncTimeout内部类

   
    protected class AsyncTimeout implements Runnable {
      
        @Override
        public void run() {
            // Loop until we receive a shutdown command
            while (running) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore
                }
                long now = System.currentTimeMillis();
                Iterator<SocketWrapper<Socket>> sockets =
                    waitingRequests.iterator();
                while (sockets.hasNext()) {
                    SocketWrapper<Socket> socket = sockets.next();
                    long access = socket.getLastAccess();
                    if ((now-access)>socket.getTimeout()) {
                        processSocketAsync(socket,SocketStatus.TIMEOUT);
                    }
                }
               
                // Loop if endpoint is paused
                while (paused && running) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
               
            }
        }
    }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-333528-1-1.html 上篇帖子: 浅析Tomcat之StandardContext 下篇帖子: 在Tomcat 6.0.X下安装Tomcat Administration Web Application
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表