本文主要研究一下httpclient的getPoolEntryBlocking
org/apache/http/pool/AbstractConnPool.java
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
}
this.lock.lock();
try {
final RouteSpecificPool<T, C, E> pool = getPool(route);
E entry;
for (;;) {
Asserts.check(!this.isShutDown, "Connection pool shut down");
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
for (;;) {
entry = pool.getFree(state);
if (entry == null) {
break;
}
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// New connection is needed
final int maxPerRoute = getMax(route);
// Shrink the pool prior to allocating a new connection
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
final C conn = this.connFactory.create(route);
entry = pool.add(conn);
this.leased.add(entry);
return entry;
}
}
boolean success = false;
try {
pool.queue(future);
this.pending.add(future);
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
} finally {
// In case of 'success', we were woken up by the
// connection pool and should now have a connection
// waiting for us, or else we're shutting down.
// Just continue in the loop, both cases are checked.
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}
AbstractConnPool提供了getPoolEntryBlocking,lease内部就是通过这个方法来获取连接的。它主要是通过pool.getFree获取空闲连接,然后进行过期判断,再判断是否close,如果已经close则从available中移除,获取成功的话则从available移除,添加到leased然后返回。如果获取不到则先判断连接是否超出maxPerRoute,超出则先移除最近使用的,之后在没有超出maxPerRoute的条件下通过connFactory.create创建然后返回,超出则放入到pending中进行等待
org/apache/http/impl/conn/PoolingHttpClientConnectionManager.java
static class InternalConnectionFactory implements ConnFactory<HttpRoute, ManagedHttpClientConnection> {
private final ConfigData configData;
private final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory;
InternalConnectionFactory(
final ConfigData configData,
final HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> connFactory) {
super();
this.configData = configData != null ? configData : new ConfigData();
this.connFactory = connFactory != null ? connFactory :
ManagedHttpClientConnectionFactory.INSTANCE;
}
@Override
public ManagedHttpClientConnection create(final HttpRoute route) throws IOException {
ConnectionConfig config = null;
if (route.getProxyHost() != null) {
config = this.configData.getConnectionConfig(route.getProxyHost());
}
if (config == null) {
config = this.configData.getConnectionConfig(route.getTargetHost());
}
if (config == null) {
config = this.configData.getDefaultConnectionConfig();
}
if (config == null) {
config = ConnectionConfig.DEFAULT;
}
return this.connFactory.create(route, config);
}
}
InternalConnectionFactory实现了ConnFactory接口,其create方法委托给ManagedHttpClientConnectionFactory.INSTANCE
public class ManagedHttpClientConnectionFactory
implements HttpConnectionFactory<HttpRoute, ManagedHttpClientConnection> {
private static final AtomicLong COUNTER = new AtomicLong();
public static final ManagedHttpClientConnectionFactory INSTANCE = new ManagedHttpClientConnectionFactory();
private final Log log = LogFactory.getLog(DefaultManagedHttpClientConnection.class);
private final Log headerLog = LogFactory.getLog("org.apache.http.headers");
private final Log wireLog = LogFactory.getLog("org.apache.http.wire");
private final HttpMessageWriterFactory<HttpRequest> requestWriterFactory;
private final HttpMessageParserFactory<HttpResponse> responseParserFactory;
private final ContentLengthStrategy incomingContentStrategy;
private final ContentLengthStrategy outgoingContentStrategy;
//......
public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {
final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;
CharsetDecoder charDecoder = null;
CharsetEncoder charEncoder = null;
final Charset charset = cconfig.getCharset();
final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?
cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;
final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?
cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;
if (charset != null) {
charDecoder = charset.newDecoder();
charDecoder.onMalformedInput(malformedInputAction);
charDecoder.onUnmappableCharacter(unmappableInputAction);
charEncoder = charset.newEncoder();
charEncoder.onMalformedInput(malformedInputAction);
charEncoder.onUnmappableCharacter(unmappableInputAction);
}
final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());
return new LoggingManagedHttpClientConnection(
id,
log,
headerLog,
wireLog,
cconfig.getBufferSize(),
cconfig.getFragmentSizeHint(),
charDecoder,
charEncoder,
cconfig.getMessageConstraints(),
incomingContentStrategy,
outgoingContentStrategy,
requestWriterFactory,
responseParserFactory);
}
}
ManagedHttpClientConnectionFactory的create方法创建的是LoggingManagedHttpClientConnection,它继承了DefaultManagedHttpClientConnection增加了logging的特性,而DefaultManagedHttpClientConnection继承了DefaultBHttpClientConnection,声明实现了ManagedHttpClientConnection
AbstractConnPool提供了getPoolEntryBlocking,lease内部就是通过这个方法来获取连接的。它主要是通过pool.getFree获取空闲连接,然后进行过期判断,再判断是否close,如果已经close则从available中移除,获取成功的话则从available移除,添加到leased然后返回。如果获取不到则先判断连接是否超出maxPerRoute,超出则先移除最近使用的,之后在没有超出maxPerRoute的条件下通过connFactory.create创建然后返回,超出则放入到pending中进行等待。
MainClientExec优先通过connManager.requestConnection是经过连接池管理的,如果连接不够用,通过connFactory.create创建新的ManagedHttpClientConnection,最后MainClientExec会再判断一下managedConn是否open,没有的话会通过establishRoute来建立连接(
HttpClientConnectionManager.connect会创建socket然后进行connect然后bind到managedConn
)