Processor是一个接口,针对于不同协议下具有不同的具体实现类,其实现类的具体功能是处理http请求,主要是对协议进行解析,状态处理以及响应。然后起一个中间作用转发到 Adater,下面是其类的关系图
其实现类中我们常用的http协议,所以一般是左边的部分,用红线标注
1.1 循环队列
protected static class RecycledProcessors<P extends Processor<S>, S> extends SynchronizedStack<Processor<S>> {
private final transient AbstractConnectionHandler<S,P> handler;
protected final AtomicInteger size = new AtomicInteger(0);
public RecycledProcessors(AbstractConnectionHandler<S,P> handler) {
this.handler = handler;
}
@SuppressWarnings("sync-override") // Size may exceed cache size a bit
@Override
public boolean push(Processor<S> processor) {
//获取Processor能够缓存的大小
int cacheSize = handler.getProtocol().getProcessorCache();
boolean offer = cacheSize == -1 ? true : size.get() < cacheSize;
//向栈中压入当前processor
boolean result = false;
if (offer) {
result = super.push(processor);
if (result) {
size.incrementAndGet();
}
}
//取消当前processor实例的JMX
if (!result) handler.unregister(processor);
return result;
}
@SuppressWarnings("sync-override") // OK if size is too big briefly
@Override
public Processor<S> pop() {
Processor<S> result = super.pop();
if (result != null) {
size.decrementAndGet();
}
return result;
}
@Override
public synchronized void clear() {
Processor<S> next = pop();
while (next != null) {
handler.unregister(next);
next = pop();
}
super.clear();
size.set(0);
}
}
在讲述Processor的获取以及处理过程之前先看一个类,姑且命名为循环队列, 它主要是继承了SynchronizedStack这个栈(tomcat自己实现)里面实现了进栈出栈两种方法。
1.1 Processor的创建
根据栈中执行的流程可以看出调用的是协议句柄的抽象类中的process方法,所以针对于四种模式其实现过程大致相同,具体代码如下:
public SocketState process(SocketWrapper<S> wrapper,
SocketStatus status) {
//如果socketWrapper为空则证明不存在socket则直接将状态设置为CLOSED
if (wrapper == null) {
return SocketState.CLOSED;
}
//获取当前SocketWrapper实例对应的NIO通道
S socket = wrapper.getSocket();
if (socket == null) {
//什么也不做 socket已经关闭
return SocketState.CLOSED;
}
/**
* 从connections中根据socket获取Processor,如果没有则在下面创建 connections句柄类型Map<S,Processor<S>>
* 在以下情况下connections中存在值
* 1.websocket中
* 2.异步servlet
* 3.发送文件
* */
Processor<S> processor = connections.get(socket);
if (status == SocketStatus.DISCONNECT && processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
wrapper.setAsync(false);
//标记当前线程是否是容器线程 set则是容器线程
ContainerThreadMarker.set();
/**
*
* 创建一个Http11NioProcessor 实例里面构造了request 和response成员变量
* 各封装了一个InternalNioInputBuffer实例
* 其中request中封装了成员属性名inputBuffer
* response中封装了成员属性名outputBuffer
* */
try {
if (processor == null) {
processor = recycledProcessors.pop();
}
if (processor == null) {
processor = createProcessor();
}
initSsl(wrapper, processor);
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
// Associate the processor with the connection as
// these calls may result in a nested call to process()
connections.put(socket, processor);
DispatchType nextDispatch = dispatches.next();
if (processor.isUpgrade()) {
state = processor.upgradeDispatch(
nextDispatch.getSocketStatus());
} else {
state = processor.asyncDispatch(
nextDispatch.getSocketStatus());
}
} else if (processor.isComet()) {
state = processor.event(status);
} else if (processor.isUpgrade()) {
state = processor.upgradeDispatch(status);
} else if (status == SocketStatus.DISCONNECT) {
// Comet and upgrade need to see DISCONNECT but the
// others don't. NO-OP and let socket close.
} else if (processor.isAsync() || state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
if (state == SocketState.OPEN) {
// release() won't get called so in case this request
// takes a long time to process, remove the socket from
// the waiting requests now else the async timeout will
// fire
getProtocol().endpoint.removeWaitingRequest(wrapper);
// There may be pipe-lined data to read. If the data
// isn't processed now, execution will exit this
// loop and call release() which will recycle the
// processor (and input buffer) deleting any
// pipe-lined data. To avoid this, process it now.
state = processor.process(wrapper);
}
} else if (status == SocketStatus.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else {
//这个是在第一次请求的时候执行
state = processor.process(wrapper);
}
//根据异步asyncStateMachine的状态设置Socket的状态
if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
// Retrieve leftover input
ByteBuffer leftoverInput = processor.getLeftoverInput();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
wrapper, leftoverInput, upgradeToken);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
if (upgradeToken.getInstanceManager() == null) {
httpUpgradeHandler.init((WebConnection) processor);
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.init((WebConnection) processor);
} finally {
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + wrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}
if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = wrapper.getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING ||
dispatches != null && state != SocketState.CLOSED);
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
//异步在第一次处理的时候会将其设置到当前connections中去
connections.put(socket, processor);
longPoll(wrapper, processor);
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
release(wrapper, processor, false, true);
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket either be added to the
// poller (or equivalent) to await more data or processed
// if there are any pipe-lined requests remaining.
connections.put(socket, processor);
} else if (state == SocketState.UPGRADED) {
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketStatus.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else {
// Connection closed. OK to recycle the processor. Upgrade
// processors are not recycled.
connections.remove(socket);
if (processor.isUpgrade()) {
UpgradeToken upgradeToken = processor.getUpgradeToken();
HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler();
InstanceManager instanceManager = upgradeToken.getInstanceManager();
if (instanceManager == null) {
httpUpgradeHandler.destroy();
} else {
ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null);
try {
httpUpgradeHandler.destroy();
} finally {
try {
instanceManager.destroyInstance(httpUpgradeHandler);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
getLog().error(sm.getString("abstractConnectionHandler.error"), e);
}
upgradeToken.getContextBind().unbind(false, oldCL);
}
}
} else {
release(wrapper, processor, true, false);
}
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
getLog().error(
sm.getString("abstractConnectionHandler.error"), e);
} finally {
ContainerThreadMarker.clear();
}
// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
// Don't try to add upgrade processors back into the pool
if (processor !=null && !processor.isUpgrade()) {
release(wrapper, processor, true, false);
}
return SocketState.CLOSED;
}
从代码中可以看出获取Processor共经过三种途径,首先在connections这个map根据socket找到对应的Processor实例,也许你会有疑惑socket为什么会相同,目前我知道的有基于长连接和Upgrade来实现的socket,这样就有效的保留其中的协议状态,以及部分请求数据。如果从其中并没有获取则在循环队列中获取(下文讲述循环队列),这相当于从栈中获取元素,这是因为当一个实例化后的Processor处理完之后,并不会回收,而是释放存入栈中供下次来可以直接进行使用,如果栈中不存在则自己再实例化一个。由这种方式可以看出其实例化跟浏览器的请求没有多大关系,在一次会话中可能使用不同的,在不同会话中也可能使用相同的Processor
1.3 Processor的释放
在当前socket处理完之后,会将Processor给释放,在这里将其部分句柄给重置之后,然后就压入循环队列供下次使用,其具体处理过程在BIO NIO 和AIO中有所出入
protected abstract void release(SocketWrapper<S> socket,
Processor<S> processor, boolean socketClosing,
boolean addToPoller);