前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >tomcat线程模型-源码解析

tomcat线程模型-源码解析

作者头像
逍遥壮士
发布2022-12-01 15:39:16
1.2K0
发布2022-12-01 15:39:16
举报
文章被收录于专栏:技术趋势

‍上文:tomcat-整启动流程-源码解析


tomcat线程模型是什么?

tomcat8以上默认是NIO,tomcat支持四种接收请求的处理方式:BIO,NIO,APR、AIO,用于处理tomcat处理客户端连接进来的后的各种请求的处理。其中处理连接的线程为单线程,而处理如果是读写事件则交给专门的线程池处理。

BIO、NIO、NIO2、ARP作用与区别

对比

BIO

NIO

NIO2

ARP

备注

同步方式

阻塞I/O模型

同步非阻塞I/O 模型

异步 I/O模型

同步非阻塞I/O 模型

APR与NIO底层一致!

执行方式

串行

并行

并行

并行

代码位置

org.apache.coyote.http11.Http11Protocol

org.apache.coyote.http11.Http11NioProtocol

org.apache.coyote.http11.Http11Nio2Protocol

org.apache.coyote.http11.Http11AprProtocol

tomcat通过AQS(AbstractQueuedsynchronizer)并发框架实现。通过LimitLatch来控制流量,tomcat默认连接数为200,可以通过server.xml中的中的maxConnections 进行配置。若线程是bio模式,则最大连接数与最大线程数为1:1。

源码学习

tomcat默认的线程模型是怎么选择及配置的?

首先确定一下tomcat的线程模型启动的时候是如何配置的。

通过上文得知tomcat启动都是通过bootstrap的main方法进行启动,其中到了。org.apache.catalina.startup.Catalina#load() 中的

代码位置:org.apache.tomcat.util.digester.Digester#parse(org.xml.sax.InputSource)

代码语言:javascript
复制
digester.parse(inputSource);

就是用于解析xml并且选择系统初始化配置的线程模型,tomcat8及以下都是bio而8以上都是nio,我这里用的是8.5所以是Nio,看如下。

代码位置:org.apache.catalina.connector.Connector#Connector(java.lang.String)

默认为1.1为bio

代码语言:javascript
复制

public Connector(String protocol) {
    //设置协议模型 tomcat8默认为bio而8以上都是NIO
    setProtocol(protocol);
    // Instantiate protocol handler
    ProtocolHandler p = null;
    try {
        Class<?> clazz = Class.forName(protocolHandlerClassName);
        p = (ProtocolHandler) clazz.getConstructor().newInstance();
    } catch (Exception e) {
        log.error(sm.getString(
                "coyoteConnector.protocolHandlerInstantiationFailed"), e);
    } finally {
        this.protocolHandler = p;
    }
    //设置语言编码默认为utf-8
    if (Globals.STRICT_SERVLET_COMPLIANCE) {
        uriCharset = StandardCharsets.ISO_8859_1;
    } else {
        uriCharset = StandardCharsets.UTF_8;
    }

    // Default for Connector depends on this (deprecated) system property
    if (Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.buf.UDecoder.ALLOW_ENCODED_SLASH", "false"))) {
        encodedSolidusHandling = EncodedSolidusHandling.DECODE;
    }
}

由于我在server.xml里面配置了

org.apache.catalina.connector.Connector#setProtocol

代码语言:javascript
复制
/**
 * Set the Coyote protocol which will be used by the connector.
 *
 * @param protocol The Coyote protocol name
 *
 * @deprecated Will be removed in Tomcat 9. Protocol must be configured via
 * the constructor
 * 设置nio线程模型
 */
@Deprecated
public void setProtocol(String protocol) {

    boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
            AprLifecycleListener.getUseAprConnector();

    if ("HTTP/1.1".equals(protocol) || protocol == null) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
        } else {
            //8.0以上默认都走nio
            setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
        }
    } else if ("AJP/1.3".equals(protocol)) {
        if (aprConnector) {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
        } else {
            setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
        }
    } else {
        setProtocolHandlerClassName(protocol);
    }
}

tomcat线程接收及请求的生命周期

Acceptor线程负责接收请求,并将请求放到Poller线程的事件队列中,注意Acceptor默认连接数为10000。

Poller线程:是一个线程池,默认线程核心数为2,会根据计算机的核心数再进行计算。Poller一直循环的,根据判断是否有值,有就取出,是一个典型的生产者-消费者模式。

SocketProcessor,就是最终的工作线程,用于处理socket的读写事件,通过监听selectionKey的事件来决定处理是什么事件。

验证,启动tomcat输入 http://localhost:8080/

然后在run下面的while (!stopCalled) 打断点,可以看到如下。

通过run方法不断的轮训进行监听新的请求,有的话进行处理。

org.apache.tomcat.util.net.Acceptor#run

代码语言:javascript
复制
public void run() {

    int errorDelay = 0;
    long pauseStart = 0;

    try {
        // Loop until we receive a shutdown command
        while (!stopCalled) {

            // Loop if endpoint is paused.
            // There are two likely scenarios here.
            // The first scenario is that Tomcat is shutting down. In this
            // case - and particularly for the unit tests - we want to exit
            // this loop as quickly as possible. The second scenario is a
            // genuine pause of the connector. In this case we want to avoid
            // excessive CPU usage.
            // Therefore, we start with a tight loop but if there isn't a
            // rapid transition to stop then sleeps are introduced.
            // < 1ms - tight loop
            // 1ms to 10ms - 1ms sleep
            // > 10ms - 10ms sleep
            while (endpoint.isPaused() && !stopCalled) {
                if (state != AcceptorState.PAUSED) {
                    pauseStart = System.nanoTime();
                    // Entered pause state
                    state = AcceptorState.PAUSED;
                }
                if ((System.nanoTime() - pauseStart) > 1_000_000) {
                    // Paused for more than 1ms
                    try {
                        if ((System.nanoTime() - pauseStart) > 10_000_000) {
                            Thread.sleep(10);
                        } else {
                            Thread.sleep(1);
                        }
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }
            }
            //停止标志
            if (stopCalled) {
                break;
            }
            //将状变设置为运行状态。
            state = AcceptorState.RUNNING;

            try {
                //if we have reached max connections, wait 超过最大数会等待,默认最大是8192
                endpoint.countUpOrAwaitConnection();

                // Endpoint might have been paused while waiting for latch
                // If that is the case, don't accept new connections
                if (endpoint.isPaused()) {
                    continue;
                }

                U socket = null;
                try {
                    // Accept the next incoming connection from the server
                    // 接收服务器的请求
                    socket = endpoint.serverSocketAccept();
                } catch (Exception ioe) {
                    // We didn't get a socket
                    endpoint.countDownConnection();
                    if (endpoint.isRunning()) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // Successful accept, reset the error delay
                errorDelay = 0;

                // Configure the socket
                if (!stopCalled && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                String msg = sm.getString("endpoint.accept.fail");
                // APR specific.
                // Could push this down but not sure it is worth the trouble.
                if (t instanceof Error) {
                    Error e = (Error) t;
                    if (e.getError() == 233) {
                        // Not an error on HP-UX so log as a warning
                        // so it can be filtered out on that platform
                        // See bug 50273
                        log.warn(msg, t);
                    } else {
                        log.error(msg, t);
                    }
                } else {
                        log.error(msg, t);
                }
            }
        }
    } finally {
        stopLatch.countDown();
    }
    state = AcceptorState.ENDED;
}

org.apache.tomcat.util.net.NioEndpoint#setSocketOptions

将socket放到列中后返回true

代码语言:javascript
复制
@Override
protected boolean setSocketOptions(SocketChannel socket) {
    NioSocketWrapper socketWrapper = null;
    try {
        // Allocate channel and wrapper
        NioChannel channel = null;
        if (nioChannels != null) {
            //从队列中获取通道
            channel = nioChannels.pop();
        }
        //如果不存在
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            //如果是https创建带鉴权的通道否则创建普通的nio
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(bufhandler, this);
            } else {
                channel = new NioChannel(bufhandler);
            }
        }
        //创建新的 wrapper容器
        NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
        //通道重置
        channel.reset(socket, newWrapper);
        //将socket放到connections列表中
        connections.put(socket, newWrapper);
        socketWrapper = newWrapper;
        //配置为不阻塞
        socket.configureBlocking(false);
        // 设置socket 参数值 (从server.xml的onnector节点上获取参数值)
        // 比如 socket发送、接收的缓存大小、心跳检测等
        socketProperties.setProperties(socket.socket());
        // 从NioChannel的缓存队列中取出一个NioChannel
        socketWrapper.setReadTimeout(getConnectionTimeout());
        //设置定入超时时间
        socketWrapper.setWriteTimeout(getConnectionTimeout());
        //设置保持存活数
        socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        // 将新接收到的socketChannel注册到poler
        poller.register(socketWrapper);
        return true;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error(sm.getString("endpoint.socketOptionsError"), t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        if (socketWrapper == null) {
            destroySocket(socket);
        }
    }
    // Tell to close the socket if needed
    return false;
}

org.apache.tomcat.util.net.AbstractEndpoint#countUpOrAwaitConnection

代码语言:javascript
复制
protected void countUpOrAwaitConnection() throws InterruptedException {
    //默认最在值为 8192
    if (maxConnections==-1) {
        return;
    }
    //LimitLatch 是一个先进先出队列,通过共享锁实现
    LimitLatch latch = connectionLimitLatch;
    if (latch!=null) {
        //通过AQS实现的锁,如果超过队列数据则进行等待
        latch.countUpOrAwait();
    }
}

那么上面处理完后到poller了,poller 流程如下。

代码语言:javascript
复制
// 将新接收到的socketChannel注册到poler
poller.register(socketWrapper);

org.apache.tomcat.util.net.NioEndpoint.Poller#register

代码语言:javascript
复制
public void register(final NioSocketWrapper socketWrapper) {
    // 设置socket的poller引用,便于后续处理
    socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    PollerEvent event = null;
    if (eventCache != null) {
        event = eventCache.pop();
    }
    if (event == null) {
        event = new PollerEvent(socketWrapper, OP_REGISTER);
    } else {
        //注操作;
        event.reset(socketWrapper, OP_REGISTER);
    }
    //添加事件
    addEvent(event);
}

private void addEvent(PollerEvent event) {
    //加入到队列中
    events.offer(event);
    //如果队列中没有待处理的事件,则唤醒阻塞状态selector
    if (wakeupCounter.incrementAndGet() == 0) {
        selector.wakeup();
    }
}

poller也是一个轮训一直在监听上游的acceptor是否有对象进来处理。

org.apache.tomcat.util.net.NioEndpoint.Poller#run

这个地方不好调试,因为一直在轮训所以载图展示有值的效果。

代码语言:javascript
复制
/**
 * The background thread that adds sockets to the Poller, checks the
 * poller for triggered events and hands the associated socket off to an
 * appropriate processor as events occur.
 * 轮训事件
 */
@Override
public void run() {
    // Loop until destroy() is called
    while (true) {

        boolean hasEvents = false;

        try {
            if (!close) {
                // 执行事件队列中的事件线程
                hasEvents = events();
                // wakeupCounter设成-1,这是与addEvent里的代码响应,
                if (wakeupCounter.getAndSet(-1) > 0) {
                    // If we are here, means we have other stuff to do
                    // Do a non blocking select
                    keyCount = selector.selectNow();
                } else {
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
            // Either we timed out or we woke up, process events first
            if (keyCount == 0) {
                hasEvents = (hasEvents | events());
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
            continue;
        }

        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // Walk through the collection of ready keys and dispatch
        // any active event.
        //todo 根据向selector中注册key遍历channel中已经就绪的keys,并处理这些key
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            //而ekyAttachment
            iterator.remove();
            NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
            // Attachment may be null if another thread has called
            // cancelledKey()
            if (socketWrapper != null) {
                processKey(sk, socketWrapper);
            }
        }

        // Process timeouts
        timeout(keyCount,hasEvents);
    }

    getStopLatch().countDown();
}

org.apache.tomcat.util.net.NioEndpoint.Poller#processKey

下面这个方法用于处理读写事件

代码语言:javascript
复制
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
    try {
        if (close) {
            cancelledKey(sk, socketWrapper);
        } else if (sk.isValid()) {
            // 处理通道发生读写事件
            if (sk.isReadable() || sk.isWritable()) {
                if (socketWrapper.getSendfileData() != null) {
                    processSendfile(sk, socketWrapper, false);
                } else {
                    // 注消已经发生事件的关注 防止通过不一事件不断的select的问题
                    unreg(sk, socketWrapper, sk.readyOps());
                    boolean closeSocket = false;
                    // Read goes before write
                    //判断是否为读事件 如果是进行处理
                    if (sk.isReadable()) {
                        // 具体的通道处理逻辑;
                        if (socketWrapper.readOperation != null) {
                            if (!socketWrapper.readOperation.process()) {
                                closeSocket = true;
                            }
                        } else if (socketWrapper.readBlocking) {
                            synchronized (socketWrapper.readLock) {
                                socketWrapper.readBlocking = false;
                                socketWrapper.readLock.notify();
                            }
                        } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    //判断是否为写事件,如果是进行写事件逻辑
                    if (!closeSocket && sk.isWritable()) {
                        if (socketWrapper.writeOperation != null) {
                            if (!socketWrapper.writeOperation.process()) {
                                closeSocket = true;
                            }
                        } else if (socketWrapper.writeBlocking) {
                            synchronized (socketWrapper.writeLock) {
                                socketWrapper.writeBlocking = false;
                                socketWrapper.writeLock.notify();
                            }
                        } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk, socketWrapper);
                    }
                }
            }
        } else {
            // Invalid key
            cancelledKey(sk, socketWrapper);
        }
    } catch (CancelledKeyException ckx) {
        cancelledKey(sk, socketWrapper);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
    }
}

org.apache.tomcat.util.net.AbstractEndpoint#processSocket

下面这个方法就是poller将请求转成事件,提交给线程池,由事件驱动

代码语言:javascript
复制
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        //如果为空则直接返回
        if (socketWrapper == null) {
            return false;
        }
        SocketProcessorBase<S> sc = null;
        //不是空从缓存中取,先进先出
        if (processorCache != null) {
            sc = processorCache.pop();
        }
        //如果为空则创建 否则重置
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        //获取线程池,然后通过工作的线程处理用户发送的信息
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            //同步执行
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        // This means we got an OOM or similar creating a thread, or that
        // the pool and its queue are full
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

那么最后就是到这个doRun 位置是:org.apache.tomcat.util.net.SocketProcessorBase#doRun

这里就是最终 SocketProcessor 的执行。

org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun

代码语言:javascript
复制
@Override
protected void doRun() {
    /*
     * Do not cache and re-use the value of socketWrapper.getSocket() in
     * this method. If the socket closes the value will be updated to
     * CLOSED_NIO_CHANNEL and the previous value potentially re-used for
     * a new connection. That can result in a stale cached value which
     * in turn can result in unintentionally closing currently active
     * connections.
     */
    //获取poller事件
    Poller poller = NioEndpoint.this.poller;
    //如果为空则直接关闭这个事件并返回
    if (poller == null) {
        socketWrapper.close();
        return;
    }

    try {
        int handshake = -1;
        try {
            if (socketWrapper.getSocket().isHandshakeComplete()) {
                // No TLS handshaking required. Let the handler
                // process this socket / event combination.
                handshake = 0;
            } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                    event == SocketEvent.ERROR) {
                // Unable to complete the TLS handshake. Treat it as
                // if the handshake failed.
                handshake = -1;
            } else {
                handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
                // The handshake process reads/writes from/to the
                // socket. status may therefore be OPEN_WRITE once
                // the handshake completes. However, the handshake
                // happens when the socket is opened so the status
                // must always be OPEN_READ after it completes. It
                // is OK to always set this as it is only used if
                // the handshake completes.
                event = SocketEvent.OPEN_READ;
            }
        } catch (IOException x) {
            handshake = -1;
            if (log.isDebugEnabled()) {
                log.debug("Error during SSL handshake",x);
            }
        } catch (CancelledKeyException ckx) {
            handshake = -1;
        }
        if (handshake == 0) {
            SocketState state = SocketState.OPEN;
            // Process the request from this socket
            if (event == null) {
                // 如果为空,获取connectionhandler进行请求处理
                state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
            } else {
                //获取connectionhandler 请求处理
                state = getHandler().process(socketWrapper, event);
            }
            if (state == SocketState.CLOSED) {
                poller.cancelledKey(getSelectionKey(), socketWrapper);
            }
        } else if (handshake == -1 ) {
            getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
            poller.cancelledKey(getSelectionKey(), socketWrapper);
        } else if (handshake == SelectionKey.OP_READ){
            socketWrapper.registerReadInterest();
        } else if (handshake == SelectionKey.OP_WRITE){
            socketWrapper.registerWriteInterest();
        }
    } catch (CancelledKeyException cx) {
        poller.cancelledKey(getSelectionKey(), socketWrapper);
    } catch (VirtualMachineError vme) {
        ExceptionUtils.handleThrowable(vme);
    } catch (Throwable t) {
        log.error(sm.getString("endpoint.processing.fail"), t);
        poller.cancelledKey(getSelectionKey(), socketWrapper);
    } finally {
        socketWrapper = null;
        event = null;
        //return to cache
        if (running && processorCache != null) {
            processorCache.push(this);
        }
    }
}

org.apache.coyote.AbstractProtocol.ConnectionHandler#process

这个方法用于处理流程获取socket状态

代码语言:javascript
复制
@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
    if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.process",
                wrapper.getSocket(), status));
    }
    if (wrapper == null) {
        // Nothing to do. Socket has been closed.
        return SocketState.CLOSED;
    }

    S socket = wrapper.getSocket();

    // We take complete ownership of the Processor inside of this method to ensure
    // no other thread can release it while we're using it. Whatever processor is
    // held by this variable will be associated with the SocketWrapper before this
    // method returns.
    //用于socket关联processor
    Processor processor = (Processor) wrapper.takeCurrentProcessor();
    if (getLog().isDebugEnabled()) {
        getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet",
                processor, socket));
    }

    // Timeouts are calculated on a dedicated thread and then
    // dispatched. Because of delays in the dispatch process, the
    // timeout may no longer be required. Check here and avoid
    // unnecessary processing.
    if (SocketEvent.TIMEOUT == status &&
            (processor == null ||
            !processor.isAsync() && !processor.isUpgrade() ||
            processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) {
        // This is effectively a NO-OP
        return SocketState.OPEN;
    }
    //
    if (processor != null) {
        // Make sure an async timeout doesn't fire
        getProtocol().removeWaitingProcessor(processor);
    } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) {
        // Nothing to do. Endpoint requested a close and there is no
        // longer a processor associated with this socket.
        return SocketState.CLOSED;
    }

    try {
        if (processor == null) {
            String negotiatedProtocol = wrapper.getNegotiatedProtocol();
            // OpenSSL typically returns null whereas JSSE typically
            // returns "" when no protocol is negotiated
            if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) {
                UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol);
                if (upgradeProtocol != null) {
                    processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
                    }
                } else if (negotiatedProtocol.equals("http/1.1")) {
                    // Explicitly negotiated the default protocol.
                    // Obtain a processor below.
                } else {
                    // TODO:
                    // OpenSSL 1.0.2's ALPN callback doesn't support
                    // failing the handshake with an error if no
                    // protocol can be negotiated. Therefore, we need to
                    // fail the connection here. Once this is fixed,
                    // replace the code below with the commented out
                    // block.
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString("abstractConnectionHandler.negotiatedProcessor.fail",
                                negotiatedProtocol));
                    }
                    return SocketState.CLOSED;
                    /*
                     * To replace the code above once OpenSSL 1.1.0 is
                     * used.
                    // Failed to create processor. This is a bug.
                    throw new IllegalStateException(sm.getString(
                            "abstractConnectionHandler.negotiatedProcessor.fail",
                            negotiatedProtocol));
                    */
                }
            }
        }
        if (processor == null) {
            processor = recycledProcessors.pop();
            if (getLog().isDebugEnabled()) {
                getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor));
            }
        }
        if (processor == null) {
            processor = getProtocol().createProcessor();
            //注册流程
            register(processor);
            if (getLog().isDebugEnabled()) {
                getLog().debug(sm.getString("abstractConnectionHandler.processorCreate", processor));
            }
        }

        processor.setSslSupport(
                wrapper.getSslSupport(getProtocol().getClientCertProvider()));

        SocketState state = SocketState.CLOSED;
        do {
            //获取状态
            state = processor.process(wrapper, status);

            if (state == SocketState.UPGRADING) {
                // Get the HTTP upgrade handler
                UpgradeToken upgradeToken = processor.getUpgradeToken();
                // Restore leftover input to the wrapper so the upgrade
                // processor can process it.
                ByteBuffer leftOverInput = processor.getLeftoverInput();
                wrapper.unRead(leftOverInput);
                if (upgradeToken == null) {
                    // Assume direct HTTP/2 connection
                    UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c");
                    if (upgradeProtocol != null) {
                        // Release the Http11 processor to be re-used
                        release(processor);
                        // Create the upgrade processor
                        processor = upgradeProtocol.getProcessor(wrapper, getProtocol().getAdapter());
                    } else {
                        if (getLog().isDebugEnabled()) {
                            getLog().debug(sm.getString(
                                "abstractConnectionHandler.negotiatedProcessor.fail",
                                "h2c"));
                        }
                        // Exit loop and trigger appropriate clean-up
                        state = SocketState.CLOSED;
                    }
                } else {
                    HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                    // Release the Http11 processor to be re-used
                    release(processor);
                    // Create the upgrade processor
                    processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken);
                    if (getLog().isDebugEnabled()) {
                        getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate",
                                processor, wrapper));
                    }
                    // Initialise the upgrade handler (which may trigger
                    // some IO using the new protocol which is why the lines
                    // above are necessary)
                    // This cast should be safe. If it fails the error
                    // handling for the surrounding try/catch will deal with
                    // it.
                    if (upgradeToken.getInstanceManager() == null) {
                        httpUpgradeHandler.init((WebConnection) processor);
                    } else {
                        ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                        try {
                            httpUpgradeHandler.init((WebConnection) processor);
                        } finally {
                            upgradeToken.getContextBind().unbind(false, oldCL);
                        }
                    }
                }
            }
        } while ( state == SocketState.UPGRADING);

        if (state == SocketState.LONG) {
            // In the middle of processing a request/response. Keep the
            // socket associated with the processor. Exact requirements
            // depend on type of long poll
            longPoll(wrapper, processor);
            if (processor.isAsync()) {
                getProtocol().addWaitingProcessor(processor);
            }
        } else if (state == SocketState.OPEN) {
            // In keep-alive but between requests. OK to recycle
            // processor. Continue to poll for the next request.
            release(processor);
            processor = null;
            wrapper.registerReadInterest();
        } else if (state == SocketState.SENDFILE) {
            // Sendfile in progress. If it fails, the socket will be
            // closed. If it works, the socket either be added to the
            // poller (or equivalent) to await more data or processed
            // if there are any pipe-lined requests remaining.
        } else if (state == SocketState.UPGRADED) {
            // Don't add sockets back to the poller if this was a
            // non-blocking write otherwise the poller may trigger
            // multiple read events which may lead to thread starvation
            // in the connector. The write() method will add this socket
            // to the poller if necessary.
            if (status != SocketEvent.OPEN_WRITE) {
                longPoll(wrapper, processor);
                getProtocol().addWaitingProcessor(processor);
            }
        } else if (state == SocketState.SUSPENDED) {
            // Don't add sockets back to the poller.
            // The resumeProcessing() method will add this socket
            // to the poller.
        } else {
            // Connection closed. OK to recycle the processor.
            // Processors handling upgrades require additional clean-up
            // before release.
            if (processor != null && processor.isUpgrade()) {
                UpgradeToken upgradeToken = processor.getUpgradeToken();
                HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
                InstanceManager instanceManager = upgradeToken.getInstanceManager();
                if (instanceManager == null) {
                    httpUpgradeHandler.destroy();
                } else {
                    ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
                    try {
                        httpUpgradeHandler.destroy();
                    } finally {
                        try {
                            instanceManager.destroyInstance(httpUpgradeHandler);
                        } catch (Throwable e) {
                            ExceptionUtils.handleThrowable(e);
                            getLog().error(sm.getString("abstractConnectionHandler.error"), e);
                        }
                        upgradeToken.getContextBind().unbind(false, oldCL);
                    }
                }
            }

            release(processor);
            processor = null;
        }

        if (processor != null) {
            wrapper.setCurrentProcessor(processor);
        }
        return state;
    } catch(java.net.SocketException e) {
        // SocketExceptions are normal
        getLog().debug(sm.getString(
                "abstractConnectionHandler.socketexception.debug"), e);
    } catch (java.io.IOException e) {
        // IOExceptions are normal
        getLog().debug(sm.getString(
                "abstractConnectionHandler.ioexception.debug"), e);
    } catch (ProtocolException e) {
        // Protocol exceptions normally mean the client sent invalid or
        // incomplete data.
        getLog().debug(sm.getString(
                "abstractConnectionHandler.protocolexception.debug"), e);
    }
    // Future developers: if you discover any other
    // rare-but-nonfatal exceptions, catch them here, and log as
    // above.
    catch (OutOfMemoryError oome) {
        // Try and handle this here to give Tomcat a chance to close the
        // connection and prevent clients waiting until they time out.
        // Worst case, it isn't recoverable and the attempt at logging
        // will trigger another OOME.
        getLog().error(sm.getString("abstractConnectionHandler.oome"), oome);
    } catch (Throwable e) {
        ExceptionUtils.handleThrowable(e);
        // any other exception or error is odd. Here we log it
        // with "ERROR" level, so it will show up even on
        // less-than-verbose logs.
        getLog().error(sm.getString("abstractConnectionHandler.error"), e);
    }

    // Make sure socket/processor is removed from the list of current
    // connections
    release(processor);
    return SocketState.CLOSED;
}

org.apache.tomcat.util.threads.ThreadPoolExecutor#runWorker

这个方法用于作业,非常复杂~

代码语言:javascript
复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts 允许中断
    boolean completedAbruptly = true;
    try {
        //不为空 并且线先也不为空
        while (task != null || (task = getTask()) != null) {
            //aqs锁
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            //判断线程执行数进行的一些中断操作
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted()) {
                wt.interrupt();
            }
            try {
                beforeExecute(wt, task);
                try {
                    //执行
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                //最终将任务置为空,并且解锁,作业+1
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //退出的操作
        processWorkerExit(w, completedAbruptly);
    }
}

那么以上就是整个用户请求到最终处理的逻辑,没有全部列,因为代码量实在过过。需要详情请下载源码研读。

最后

tomat底层是通过netty实现io相关的操作,但是又区别于netty,因为有些处理由tomcat再封装因为本文主要用于学习了解tomcat的线程模型初始化的配置以及种类,还有针对一个用户请求的时候经过哪些组件,当然这里仅对组件描述部分核心逻辑,详细逻辑可能需要你花时间研读,主要了解tomcat底层的实现逻辑各个组件如何配合,以及各种设计模式是如何交互。有兴趣的同学可以根据提供的源码继续深入研究或者参考下面本文参考的文章进行深入。

参考文章:

https://www.cnblogs.com/qmillet/p/12553328.html

https://zhuanlan.zhihu.com/p/393390855

https://blog.csdn.net/qq_16681169/article/details/75003640

https://juejin.cn/post/6844903966422073352

https://juejin.cn/post/6844904018955730951

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-07-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 技术趋势 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档