Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >tomcat请求处理分析(四) 监听请求轮询处理

tomcat请求处理分析(四) 监听请求轮询处理

作者头像
cfs
发布于 2018-03-08 07:38:05
发布于 2018-03-08 07:38:05
1.6K0
举报
文章被收录于专栏:编码小白编码小白
1.1.1.1  startInternal方法

这个方法是核心的启动方法,目前理解主要做了两件事情,第一件是创建轮询线程,即具体的读取线程,它是进行具体的处理,第二个是创建创建监听请求线程,它是等待请求,然后交给轮训进行处理。

public void startInternal() throws Exception { if (!running) { running = true; paused = false; //一种带锁的栈,processorCache processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); //事件缓存 eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); //nio管道 nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create workercollection if (getExecutor() == null ) {             createExecutor();  //实例化当前对象的成员变量executor,构建了一个线程池 }         initializeConnectionLatch(); //Poller的数量控制如果不设置的话最大就是2 pollers = new Poller[getPollerThreadCount()];         for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority);//用来设置进程、进程组和用户的进程执行优先权 pollerThread.setDaemon(true);//设置为守护线程 pollerThread.start(); } startAcceptorThreads(); } }

1.1.1.1.1     Poller启动

它是被设计成了守护线程,并且进行启动,其run方法如下,采用选择器的非阻塞方式,如果没有获取到注册事件返回空,下面迭代为空所以就什么都没有执行,如果返回不为空则会执行processKey方法。

public void run() { //这是一个线程,所以进行死循环 while (true) { try { //如果是暂停并且未关闭则睡10s while (paused &&(!close) ) { try {                     Thread.sleep(100); } catch (InterruptedExceptione) { }             } boolean hasEvents = false; //如果关闭之后,执行完毕时间后,关闭选择器 if (close) {                 events(); timeout(0, false);                 try { selector.close(); } catch (IOExceptionioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } else {                 hasEvents = events(); } /**              * 如果endpoint是正常工作状态,处理已有的数据。              * 通过events方法来处理当前Poller中已有的事件(数据)。              * 同时使用selector.select或者selectNow来获取这个Poller上              * */ try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do                         //do a nonblocking select keyCount =selector.selectNow(); } else { keyCount =selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) {                     events(); timeout(0, false);                     try { selector.close(); } catch (IOExceptionioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; }             } catch (Throwablex) {                 ExceptionUtils.handleThrowable(x); log.error("",x);                 continue; } //either we timed out orwe woke up, process events first if ( keyCount == 0 ) hasEvents= (hasEvents | events()); //正常状态下的数据处理,通过processKey来实现。获取对应的渠道的key,然后调用processKey方法 Iterator<SelectionKey>iterator = keyCount > 0 ? selector.selectedKeys().iterator(): null; // Walk through thecollection of ready keys and dispatch             // any active event. while (iterator !=null&&iterator.hasNext()) {                 SelectionKey sk =iterator.next(); KeyAttachmentattachment = (KeyAttachment)sk.attachment(); if (attachment== null) {                     iterator.remove(); } else {                     attachment.access(); iterator.remove(); //processKey的主要工作是调用NioEndpoint的processSocket来实现socket的读写。  processKey(sk, attachment); }             }//while timeout(keyCount,hasEvents);             if ( oomParachute>0&&oomParachuteData==null)checkParachute(); } stopLatch.countDown();

}

1.1.1.1.2     Acceptor

这是一个接受请求的线程,调用的是startAcceptorThreads方法,方法代码如下:

protected final void startAcceptorThreads() { int count =getAcceptorThreadCount(); acceptors = new Acceptor[count];     for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName =getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); }

}

protectedAbstractEndpoint.AcceptorcreateAcceptor() { return new Acceptor(); }

    所以启动的事Acceptor的线程,主要调用的是其run方法,它做的事情是等待客户端请求,由于在bind方法中ServerSocketChannel这个设置阻塞方式,所以socket = serverSock.accept();在接受请求之后才会进行处理,具体的处理过程在setSocketOptions方法

/**  * Acceptor负责用来管理连接到tomcat服务器的数量  * socket连接建立成功之后,读写是交由Poller机制去完成。  * */ protected class Acceptor extends AbstractEndpoint.Acceptor{ @Override public void run() { int errorDelay =0; while (running) { while (paused && running) { state =AcceptorState.PAUSED;                 try {                     Thread.sleep(50); } catch (InterruptedExceptione) { }             } if (!running) { break; } state =AcceptorState.RUNNING;             try {                countUpOrAwaitConnection(); //计数+1,达到最大值则等待 SocketChannel socket = null;                 try { //ServerSocketChannel 一个阻塞监听等待请求  socket = serverSock.accept(); } catch (IOExceptionioe) { //we didn't geta socket countDownConnection(); // Introducedelay if necessary errorDelay =handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept,reset the error delay errorDelay = 0; // setSocketOptions() willadd channel to the poller                 // if successful if (running && !paused) { //将请求连接放入队列等待处理 if (!setSocketOptions(socket)) {                         countDownConnection(); closeSocket(socket); }                 } else {                     countDownConnection(); //计数-1 closeSocket(socket);   //关闭当前socket套接字 }             } catch (SocketTimeoutExceptionsx) { // Ignore: Normalcondition } catch (IOExceptionx) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); }             } catch (OutOfMemoryErroroom) { try { oomParachuteData=null; releaseCaches(); log.error("", oom); }catch ( Throwableoomt ) { try { try {                             System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (ThrowableletsHopeWeDontGetHere){                             ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); }                     }catch (ThrowableletsHopeWeDontGetHere){                         ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); }                 }             } catch (Throwablet) {                 ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); }         } state =AcceptorState.ENDED; }

}

1.1.1.1.3     acceptor线程转交到poller进行处理

      setSocketOptions方法通过通道获取真实的socket注入一些属性,然后构造NioChannel,将socket通道注入到对应的NioChannel实例,利用getPoller0用的循环的方式来返回Poller然后将NioChannel实例注册

protected boolean setSocketOptions(SocketChannel socket){ // Process the connection try { //设置为非阻塞  socket.configureBlocking(false); //获取socket   Socket sock = socket.socket();//实际socket //配置socket信息 socketProperties.setProperties(sock); //创建一个NioChannel 他封装了SocketChannel NioChannel channel = nioChannels.pop();         if ( channel == null ) { //如果为null 创建一个NioChannel 这里使用系统内存             //使用系统内存可以省去一步从系统内存拷贝到堆内存的动作、性能上会有很大的提升,nioChannels初始化默认为128个             //当socket 关闭的重新清理NioChannel而不是销毁这个对象可以达到对象复用的效果、因为申请系统内存的开销比申请堆内存的开销要大很多 if (sslContext != null) {                 SSLEngine engine =createSSLEngine();                 int appbufsize =engine.getSession().getApplicationBufferSize(); //NioBufferHandler里分别分配了读缓冲区和写缓冲区 NioBufferHandler bufhandler= newNioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()), Math.max(appbufsize,socketProperties.getAppWriteBufSize()), socketProperties.getDirectBuffer()); channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool); } else { // normal tcp setup NioBufferHandlerbufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); channel = new NioChannel(socket, bufhandler); }         } else { //如果存在通道,则直接将当前socket注入  channel.setIOChannel(socket);             if ( channel instanceof SecureNioChannel) {                 SSLEngine engine =createSSLEngine(); ((SecureNioChannel)channel).reset(engine); } else {                 channel.reset(); }         } // 这里就是将SocketChannel注册到Poller了。         // getPoller0用的循环的方式来返回Poller,即Poller 1, 2,3... n 然后再回到1, 2, 3..  getPoller0().register(channel); } catch (Throwablet) {         ExceptionUtils.handleThrowable(t);         try { log.error("",t); } catch (Throwablett) {             ExceptionUtils.handleThrowable(tt); } // Tell to close thesocket return false; } return true;

}

上文注册还不是选择器的注入方式,而是在NioEndpoint内部类Poller类的register方法,其代码如下:在前面设置了一些基本属性,然后调用addEvent唤醒对应的选择器,这个selector实例是Poller对象的一个成员变量,对应的非阻塞过程在run方法,所以监听请求世实际还是在Poller的run方法中selectNow后面进行处理

public void register(final NioChannelsocket) { //给当前socket设置为这个Poller实例 socket.setPoller(this); //构造KeyAttachment实例,其继承SocketWrapper KeyAttachment ka = new KeyAttachment(socket); //设置其轮询实例 ka.setPoller(this); ka.setTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); // 从Poller的事件对象缓存中取出一个PollerEvent,并用socket初始化事件对象 PollerEvent r = eventCache.pop(); // 设置读操作为感兴趣的操作 ka.interestOps(SelectionKey.OP_READ);     if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);     else r.reset(socket,ka,OP_REGISTER); // 加入到Poller对象里的事件队列  addEvent(r); }

private void addEvent(PollerEvent event) { events.offer(event);     if ( wakeupCounter.incrementAndGet()== 0)selector.wakeup(); }

   具体执行的接受到通道注册的时间之后,往下执行,就能够产生相应的选择键,这样会执行processKey这个方法,然后将请求进行处理,并解析成相关的流,返回到界面。

public void run() {  …… /**              * 如果endpoint是正常工作状态,处理已有的数据。              * 通过events方法来处理当前Poller中已有的事件(数据)。              * 同时使用selector.select或者selectNow来获取这个Poller上              * */ try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) {    keyCount = selector.selectNow(); } else {  keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) {                     events(); timeout(0, false);                     try { selector.close(); } catch (IOExceptionioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; }             } catch (Throwablex) {                 ExceptionUtils.handleThrowable(x); log.error("",x);                 continue; } if ( keyCount == 0 ) hasEvents= (hasEvents | events()); //正常状态下的数据处理,通过processKey来实现。获取对应的渠道的key,然后调用processKey方法 Iterator<SelectionKey>iterator = keyCount > 0 ? selector.selectedKeys().iterator(): null; // Walk through thecollection of ready keys and dispatch             // any active event. while (iterator !=null&&iterator.hasNext()) {                 SelectionKey sk =iterator.next(); KeyAttachmentattachment = (KeyAttachment)sk.attachment(); // Attachment may be nullif another thread has called                 // cancelledKey() if (attachment== null) {                     iterator.remove(); } else {                     attachment.access(); iterator.remove(); //processKey的主要工作是调用NioEndpoint的processSocket来实现socket的读写。  processKey(sk, attachment); }             }//while             //process timeouts timeout(keyCount,hasEvents);             if ( oomParachute>0&&oomParachuteData==null)checkParachute(); } catch (OutOfMemoryErroroom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwableoomt ) { try {                     System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (ThrowableletsHopeWeDontGetHere){                     ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); }             }         }     }//while stopLatch.countDown();

}

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017年08月19日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Java编程架构详解——Tomcat 中的 NIO 源码分析
文将介绍 Tomcat 中的 NIO 使用,使大家对 Java NIO 的生产使用有更加直观的认识。
慕容千语
2019/06/11
1.2K0
tomcat nio源码分析
1 NioEndpoint.Acceptor等待客户端连接,客户端连接之后将SocketChannel转发给Poller
东营浪人
2020/02/11
5980
【Tomcat源码分析】从零开始理解 HTTP 请求处理 (第二篇)
深入探究 Connector 启动逻辑后,我们接下来需细致分析 HTTP 请求的执行流程。从客户端发出的请求,要经历哪些环节才能最终被处理?
@派大星
2024/09/20
2020
【Tomcat源码分析】从零开始理解 HTTP 请求处理 (第二篇)
Tomcat NIO
说起Tomcat的NIO,不得不提的就是Connector这个Tomcat组件。Connector是Tomcat的连接器,其主要任务是负责处理收到的请求,并创建一个Request和Response的对象,然后用一个线程用于处理请求,Connector会把Request和Response对象传递给该线程,该线程的具体的处理过程是Container容器的事了。 在tomcat启动过程中,会初始化Connector,并调用Connector的startInternal()方法开启Connector,开始监听、处理
欠扁的小篮子
2018/04/16
1.9K0
Tomcat NIO
Tomcat NIO(7)-Poller
在上一篇文章里我们主要介绍了 tomcat NIO 中的 acceptor 线程,其中包括了server 监听 socket 的初始化,端口绑定,acceptor 线程的启动,接受连接请求,将请求事件注册到 poller 线程。在这里我们主要介绍 poller 线程。
TA码字
2020/09/01
2K0
tomcat线程模型-源码解析
tomcat8以上默认是NIO,tomcat支持四种接收请求的处理方式:BIO,NIO,APR、AIO,用于处理tomcat处理客户端连接进来的后的各种请求的处理。其中处理连接的线程为单线程,而处理如果是读写事件则交给专门的线程池处理。
逍遥壮士
2022/12/01
1.4K0
tomcat线程模型-源码解析
Tomcat源码解析(六):Connector、ProtocolHandler、Endpoint
Tomcat源码解析(四):StandardServer和StandardService
Java微观世界
2025/01/21
5450
Tomcat源码解析(六):Connector、ProtocolHandler、Endpoint
从SocketTimeoutException到全连接队列和半连接队列
大概在一年半之前的时候,我们的应用的某个业务开始间歇报SocketTimeoutException, 不是前端调用我们发生SocketTimeoutException,而是我们用 HTTP Client中台拉取数据的时候,会偶尔报SocketTimeException, 这个偶尔可能是一个月报一次,也可能是两个月报一次,可能一个星期报两次,频率不固定,次数也不固定,当我第一次看到这个异常的时候,我的第一个反应就是用这个异常信息去搜索引擎上搜索解决方案,我并不理解这个异常说明了什么,但是按照我以往的经验来说,一般都有解决方案,对搜索引擎的方案一般都是延长超时时间,于是我延长了超时时间,但这并没有根本上解决问题,还是会出问题。延长超时时间不管用之后,我就扩容,但是扩容依然也不管用,我当时在尝试复现这个异常的时候,也忽略了一些东西,然后导致我在测试无法复现,能够复现的问题都是好问题,我之前面试的时候也背过三次握手,也学过Java 的原生Socket 编程,Netty,我背过Tomcat的acceptCount参数,但是碰到这个问题,这些知识仍然没有帮我解决问题,原因当时我网络的知识没有连接起来,他们孤零零的,向孤零零的神经元一样,没建立起来连接,最后这个问题开始让这些知识开始建立连接,成体系的发展。连接才是有价值的。
fliter
2023/10/09
5640
从SocketTimeoutException到全连接队列和半连接队列
Tomcat NIO(6)-Acceptor
在上一篇文章里我们主要介绍了 tomcat NIO 的整体架构,以及在这个架构下的各个线程,在这里我们主要介绍 acceptor 线程。
TA码字
2020/08/25
1.3K0
Tomcat - Tomcat 网络通信模型剖析 & 并发参数解读
IO是指为数据传输所提供的输入输出流,其输入输出对象可以是:文件、网络服务、内存等。
小小工匠
2021/08/17
9730
Tomcat卷二---请求流程源码分析
设计了这么多层次的容器,Tomcat是怎么确定每一个请求应该由哪个Wrapper容器里的 Servlet来处理的呢?
大忽悠爱学习
2022/05/09
8130
Tomcat卷二---请求流程源码分析
曹工说Tomcat:200个http-nio-8080线程全都被第三方服务拖住了,这可如何是好(上:线程模型解析)
这两年,tomcat慢慢在新项目里不怎么接触了,因为都被spring boot之类的框架封装进了内部,成了内置server,不用像过去那样打个war包,再放到tomcat里部署了。
低级知识传播者
2021/09/23
2.6K0
曹工说Tomcat:200个http-nio-8080线程全都被第三方服务拖住了,这可如何是好(上:线程模型解析)
Tomcat中BIO与NIO
Connector是一个桥梁它把Server和Engine链接了起来,Connector的作用是接受客户端端的请求,然后把请求委托为engine容器去处理。 Connector内部使用endpoint进行处理,根据处理方式的不同分为NioEndpoint,JIoEndpoint,AprEndpoint。
加多
2018/09/06
1.4K0
Tomcat中BIO与NIO
tomcat请求处理分析(五) 请求到响应流
1.1.1.1  请求到响应界面流 请求处理的过程主要是将所有的东西解析成流,转化成对应的http报文,所以在这里我先不关注servlet因为它最终也就是解析成流里面的数据 processKey里面最终执行的是processSocket,它是线从缓存中获取对应的线程池,没有的话就创建一个,然后进行执行 protected boolean processSocket(KeyAttachmentattachment, SocketStatus status, boolean dispatch) { try {
cfs
2018/03/08
1.3K0
Tomcat NIO(13)-BlockPoller
在上一篇文章里我们主要介绍了 tomcat NIO 中响应数据的写入,在这里我们主要介绍 BlockPoller 线程。
TA码字
2020/11/11
1.2K0
Tomcat架构解析之3 Connector NIOAcceptorPollerWorkerNioSelectorPool
上文简单记录了默认的Connector的内部构造及消息流,同时此Connector也是基于BIO的实现。 除BIO,也可以通过配置快速部署NIO的connector。在server.xml中如下配置
JavaEdge
2018/05/16
7830
深入浅出Tomcat网络通信的高并发处理机制
随着互联网应用的快速发展,Web服务器面临的访问压力日益增大,如何高效处理高并发的网络请求成为关键
菜菜的后端私房菜
2024/08/06
5561
Tomcat NIO(8)-Poller线程的阻塞与唤醒
在上一篇文章里我们主要介绍了 tomcat NIO 中的 poller 线程,包括启动 poller 线程,添加事件到事件队列,对原始 socket 注册事件和 poller 线程的核心逻辑。在这里我们主要介绍 poller 线程的阻塞与唤醒。
TA码字
2020/09/10
1.5K0
Tomcat卷一 ----架构和初始化源码分析
1) Tomcat 最初由Sun公司的软件架构师 James Duncan Davidson 开发,名称为 “JavaWebServer”。
大忽悠爱学习
2022/05/09
9810
Tomcat卷一 ----架构和初始化源码分析
【Tomcat源码分析】从零开始理解 HTTP 请求处理 (第一篇)
终于步入 Connector 的解析阶段,这无疑是 Tomcat 架构中最为复杂的一环。作为连接器,它的职责显而易见——连接。那么,它连接的究竟是什么呢?
@派大星
2024/09/18
2300
【Tomcat源码分析】从零开始理解 HTTP 请求处理 (第一篇)
相关推荐
Java编程架构详解——Tomcat 中的 NIO 源码分析
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档