这个方法是核心的启动方法,目前理解主要做了两件事情,第一件是创建轮询线程,即具体的读取线程,它是进行具体的处理,第二个是创建创建监听请求线程,它是等待请求,然后交给轮训进行处理。
public void startInternal() throws Exception { if (!running) { running = true; paused = false; //一种带锁的栈,processorCache processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); //事件缓存 eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); //nio管道 nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool()); // Create workercollection if (getExecutor() == null ) { createExecutor(); //实例化当前对象的成员变量executor,构建了一个线程池 } initializeConnectionLatch(); //Poller的数量控制如果不设置的话最大就是2 pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority);//用来设置进程、进程组和用户的进程执行优先权 pollerThread.setDaemon(true);//设置为守护线程 pollerThread.start(); } startAcceptorThreads(); } }
它是被设计成了守护线程,并且进行启动,其run方法如下,采用选择器的非阻塞方式,如果没有获取到注册事件返回空,下面迭代为空所以就什么都没有执行,如果返回不为空则会执行processKey方法。
public void run() { //这是一个线程,所以进行死循环 while (true) { try { //如果是暂停并且未关闭则睡10s while (paused &&(!close) ) { try { Thread.sleep(100); } catch (InterruptedExceptione) { } } boolean hasEvents = false; //如果关闭之后,执行完毕时间后,关闭选择器 if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOExceptionioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } else { hasEvents = events(); } /** * 如果endpoint是正常工作状态,处理已有的数据。 * 通过events方法来处理当前Poller中已有的事件(数据)。 * 同时使用selector.select或者selectNow来获取这个Poller上 * */ try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a nonblocking select keyCount =selector.selectNow(); } else { keyCount =selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOExceptionioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwablex) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out orwe woke up, process events first if ( keyCount == 0 ) hasEvents= (hasEvents | events()); //正常状态下的数据处理,通过processKey来实现。获取对应的渠道的key,然后调用processKey方法 Iterator<SelectionKey>iterator = keyCount > 0 ? selector.selectedKeys().iterator(): null; // Walk through thecollection of ready keys and dispatch // any active event. while (iterator !=null&&iterator.hasNext()) { SelectionKey sk =iterator.next(); KeyAttachmentattachment = (KeyAttachment)sk.attachment(); if (attachment== null) { iterator.remove(); } else { attachment.access(); iterator.remove(); //processKey的主要工作是调用NioEndpoint的processSocket来实现socket的读写。 processKey(sk, attachment); } }//while timeout(keyCount,hasEvents); if ( oomParachute>0&&oomParachuteData==null)checkParachute(); } stopLatch.countDown();
}
这是一个接受请求的线程,调用的是startAcceptorThreads方法,方法代码如下:
protected final void startAcceptorThreads() { int count =getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); String threadName =getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); }
}
protectedAbstractEndpoint.AcceptorcreateAcceptor() { return new Acceptor(); }
所以启动的事Acceptor的线程,主要调用的是其run方法,它做的事情是等待客户端请求,由于在bind方法中ServerSocketChannel这个设置阻塞方式,所以socket = serverSock.accept();在接受请求之后才会进行处理,具体的处理过程在setSocketOptions方法
/** * Acceptor负责用来管理连接到tomcat服务器的数量 * socket连接建立成功之后,读写是交由Poller机制去完成。 * */ protected class Acceptor extends AbstractEndpoint.Acceptor{ @Override public void run() { int errorDelay =0; while (running) { while (paused && running) { state =AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedExceptione) { } } if (!running) { break; } state =AcceptorState.RUNNING; try { countUpOrAwaitConnection(); //计数+1,达到最大值则等待 SocketChannel socket = null; try { //ServerSocketChannel 一个阻塞监听等待请求 socket = serverSock.accept(); } catch (IOExceptionioe) { //we didn't geta socket countDownConnection(); // Introducedelay if necessary errorDelay =handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } // Successful accept,reset the error delay errorDelay = 0; // setSocketOptions() willadd channel to the poller // if successful if (running && !paused) { //将请求连接放入队列等待处理 if (!setSocketOptions(socket)) { countDownConnection(); closeSocket(socket); } } else { countDownConnection(); //计数-1 closeSocket(socket); //关闭当前socket套接字 } } catch (SocketTimeoutExceptionsx) { // Ignore: Normalcondition } catch (IOExceptionx) { if (running) { log.error(sm.getString("endpoint.accept.fail"), x); } } catch (OutOfMemoryErroroom) { try { oomParachuteData=null; releaseCaches(); log.error("", oom); }catch ( Throwableoomt ) { try { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (ThrowableletsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } }catch (ThrowableletsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } catch (Throwablet) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state =AcceptorState.ENDED; }
}
setSocketOptions方法通过通道获取真实的socket注入一些属性,然后构造NioChannel,将socket通道注入到对应的NioChannel实例,利用getPoller0用的循环的方式来返回Poller然后将NioChannel实例注册
protected boolean setSocketOptions(SocketChannel socket){ // Process the connection try { //设置为非阻塞 socket.configureBlocking(false); //获取socket Socket sock = socket.socket();//实际socket //配置socket信息 socketProperties.setProperties(sock); //创建一个NioChannel 他封装了SocketChannel NioChannel channel = nioChannels.pop(); if ( channel == null ) { //如果为null 创建一个NioChannel 这里使用系统内存 //使用系统内存可以省去一步从系统内存拷贝到堆内存的动作、性能上会有很大的提升,nioChannels初始化默认为128个 //当socket 关闭的重新清理NioChannel而不是销毁这个对象可以达到对象复用的效果、因为申请系统内存的开销比申请堆内存的开销要大很多 if (sslContext != null) { SSLEngine engine =createSSLEngine(); int appbufsize =engine.getSession().getApplicationBufferSize(); //NioBufferHandler里分别分配了读缓冲区和写缓冲区 NioBufferHandler bufhandler= newNioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()), Math.max(appbufsize,socketProperties.getAppWriteBufSize()), socketProperties.getDirectBuffer()); channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool); } else { // normal tcp setup NioBufferHandlerbufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); channel = new NioChannel(socket, bufhandler); } } else { //如果存在通道,则直接将当前socket注入 channel.setIOChannel(socket); if ( channel instanceof SecureNioChannel) { SSLEngine engine =createSSLEngine(); ((SecureNioChannel)channel).reset(engine); } else { channel.reset(); } } // 这里就是将SocketChannel注册到Poller了。 // getPoller0用的循环的方式来返回Poller,即Poller 1, 2,3... n 然后再回到1, 2, 3.. getPoller0().register(channel); } catch (Throwablet) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwablett) { ExceptionUtils.handleThrowable(tt); } // Tell to close thesocket return false; } return true;
}
上文注册还不是选择器的注入方式,而是在NioEndpoint内部类Poller类的register方法,其代码如下:在前面设置了一些基本属性,然后调用addEvent唤醒对应的选择器,这个selector实例是Poller对象的一个成员变量,对应的非阻塞过程在run方法,所以监听请求世实际还是在Poller的run方法中selectNow后面进行处理
public void register(final NioChannelsocket) { //给当前socket设置为这个Poller实例 socket.setPoller(this); //构造KeyAttachment实例,其继承SocketWrapper KeyAttachment ka = new KeyAttachment(socket); //设置其轮询实例 ka.setPoller(this); ka.setTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); // 从Poller的事件对象缓存中取出一个PollerEvent,并用socket初始化事件对象 PollerEvent r = eventCache.pop(); // 设置读操作为感兴趣的操作 ka.interestOps(SelectionKey.OP_READ); if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); // 加入到Poller对象里的事件队列 addEvent(r); }
private void addEvent(PollerEvent event) { events.offer(event); if ( wakeupCounter.incrementAndGet()== 0)selector.wakeup(); }
具体执行的接受到通道注册的时间之后,往下执行,就能够产生相应的选择键,这样会执行processKey这个方法,然后将请求进行处理,并解析成相关的流,返回到界面。
public void run() { …… /** * 如果endpoint是正常工作状态,处理已有的数据。 * 通过events方法来处理当前Poller中已有的事件(数据)。 * 同时使用selector.select或者selectNow来获取这个Poller上 * */ try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOExceptionioe) { log.error(sm.getString( "endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwablex) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } if ( keyCount == 0 ) hasEvents= (hasEvents | events()); //正常状态下的数据处理,通过processKey来实现。获取对应的渠道的key,然后调用processKey方法 Iterator<SelectionKey>iterator = keyCount > 0 ? selector.selectedKeys().iterator(): null; // Walk through thecollection of ready keys and dispatch // any active event. while (iterator !=null&&iterator.hasNext()) { SelectionKey sk =iterator.next(); KeyAttachmentattachment = (KeyAttachment)sk.attachment(); // Attachment may be nullif another thread has called // cancelledKey() if (attachment== null) { iterator.remove(); } else { attachment.access(); iterator.remove(); //processKey的主要工作是调用NioEndpoint的processSocket来实现socket的读写。 processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); if ( oomParachute>0&&oomParachuteData==null)checkParachute(); } catch (OutOfMemoryErroroom) { try { oomParachuteData = null; releaseCaches(); log.error("", oom); }catch ( Throwableoomt ) { try { System.err.println(oomParachuteMsg); oomt.printStackTrace(); }catch (ThrowableletsHopeWeDontGetHere){ ExceptionUtils.handleThrowable(letsHopeWeDontGetHere); } } } }//while stopLatch.countDown();
}