原文作者:星宸2021 原文链接:https://juejin.cn/post/7000360176834330638
1.梳理okhttp的整体流程 2.Java和kotlin版本的对比 (Java版本为3.14.x) 3.流程梳理都在Java版本中,kotlin作为一个对比
OkHttp works on Android 5.0+ (API level 21+) and Java 8+. 要求在Android5.0以上的版本上且jdk版本为jdk8
引入依赖
//新版库采用kotlin语言编写
implementation("com.squareup.okhttp3:okhttp:4.9.1")
发送请求
以get请求为例
OkHttpClient client = new OkHttpClient();
//同步请求
Response response = client.newCall(request).execute();
//异步请求
client.newCall(request).enqueue(new Callback() {
@Override
public
void
onFailure(Call call, IOException e)
{
}
@Override
public
void
onResponse(Call call, Response response)
throws IOException {
}
下面的拦截器顺序都是递增的
// Call the next interceptor in the chain.
//调用链的下一个拦截器
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request); //(1)
Interceptor interceptor = interceptors.get(index); //(2)
Response response = interceptor.intercept(next); //(3)
1.实例化下一个拦截器对应的RealIterceptorChain
对象,这个对象会在传递给当前的拦截器
interceptors
是存放拦截器的ArryList
intercept()
方法,并将下一个拦截器的RealIterceptorChain
对象传递下去interceptor
,第一个调用的就是retryAndFollowUpInterceptor
1.拦截器用了责任链设计模式
,它将请求一层一层向下传,知道有一层能够得到Response就停止向下传递
2.然后将response
向上面的拦截器传递,然后各个拦截器会对respone
进行一些处理,最后会传到RealCall
类中通过execute
来得到response
简而言之:每一个拦截器都对应一个 RealInterceptorChain ,然后每一个interceptor 再产生下一个RealInterceptorChain,直到 List 迭代完成,如下图所示
image.png
在开始下面的内容前,我们先简单的对Dispatcher有个认识 👇👇👇
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
//正在准备中的异步请求队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
//运行中的异步请求
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
//同步请求
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
......
}
1.有一个最大请求次数:64
2.有一个最大请求主机数:5
3.有一个懒加载的线程池,当执行 executorService() 方式时才创建
4.有三个队列(准备中的异步请求 | 运行中的异步请求 | 同步请求)
OkHttpClient client = new OkHttpClient()
这部分中Java和kotlin中没有什么区别,都用了 建造者模式 ,Builder里面的可配置参数也是一样的
public OkHttpClient() { this(new Builder());}OkHttpClient(Builder builder) { ....}public static final class Builder { Dispatcher dispatcher;// 分发器 @Nullable Proxy proxy; List<Protocol> protocols; List<ConnectionSpec> connectionSpecs;// 传输层版本和连接协议 final List<Interceptor> interceptors = new ArrayList<>();// 拦截器 final List<Interceptor> networkInterceptors = new ArrayList<>(); EventListener.Factory eventListenerFactory; ProxySelector proxySelector; CookieJar cookieJar; @Nullable Cache cache; @Nullable InternalCache internalCache;// 内部缓存 SocketFactory socketFactory; @Nullable SSLSocketFactory sslSocketFactory;// 安全套接层socket 工厂,用于HTTPS @Nullable CertificateChainCleaner certificateChainCleaner;// 验证确认响应证书 适用 HTTPS 请求连接的主机名。 HostnameVerifier hostnameVerifier;// 验证确认响应证书 适用 HTTPS 请求连接的主机名。 CertificatePinner certificatePinner;// 证书锁定,使用CertificatePinner来约束哪些认证机构被信任。 Authenticator proxyAuthenticator;// 代理身份验证 Authenticator authenticator;// 身份验证 ConnectionPool connectionPool;// 连接池 Dns dns; boolean followSslRedirects; // 安全套接层重定向 boolean followRedirects;// 本地重定向 boolean retryOnConnectionFailure;// 重试连接失败 int callTimeout; int connectTimeout; int readTimeout; int writeTimeout; int pingInterval; // 这里是默认配置的构建参数 public Builder() { dispatcher = new Dispatcher(); protocols = DEFAULT_PROTOCOLS; connectionSpecs = DEFAULT_CONNECTION_SPECS; ... } // 这里传入自己配置的构建参数 Builder(OkHttpClient okHttpClient) { this.dispatcher = okHttpClient.dispatcher; this.proxy = okHttpClient.proxy; this.protocols = okHttpClient.protocols; this.connectionSpecs = okHttpClient.connectionSpecs; this.interceptors.addAll(okHttpClient.interceptors); this.networkInterceptors.addAll(okHttpClient.networkInterceptors); ... }
open class OkHttpClient internal constructor( builder: Builder) : Cloneable, Call.Factory, WebSocket.Factory { //... constructor() : this(Builder()) //... internal constructor(okHttpClient: OkHttpClient) : this() { this.dispatcher = okHttpClient.dispatcher this.connectionPool = okHttpClient.connectionPool this.interceptors += okHttpClient.interceptors this.networkInterceptors += okHttpClient.networkInterceptors this.eventListenerFactory = okHttpClient.eventListenerFactory this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure this.authenticator = okHttpClient.authenticator this.followRedirects = okHttpClient.followRedirects this.followSslRedirects = okHttpClient.followSslRedirects this.cookieJar = okHttpClient.cookieJar this.cache = okHttpClient.cache this.dns = okHttpClient.dns this.proxy = okHttpClient.proxy this.proxySelector = okHttpClient.proxySelector this.proxyAuthenticator = okHttpClient.proxyAuthenticator this.socketFactory = okHttpClient.socketFactory this.sslSocketFactoryOrNull = okHttpClient.sslSocketFactoryOrNull this.x509TrustManagerOrNull = okHttpClient.x509TrustManager this.connectionSpecs = okHttpClient.connectionSpecs this.protocols = okHttpClient.protocols this.hostnameVerifier = okHttpClient.hostnameVerifier this.certificatePinner = okHttpClient.certificatePinner this.certificateChainCleaner = okHttpClient.certificateChainCleaner this.callTimeout = okHttpClient.callTimeoutMillis this.connectTimeout = okHttpClient.connectTimeoutMillis this.readTimeout = okHttpClient.readTimeoutMillis this.writeTimeout = okHttpClient.writeTimeoutMillis this.pingInterval = okHttpClient.pingIntervalMillis this.minWebSocketMessageToCompress = okHttpClient.minWebSocketMessageToCompress this.routeDatabase = okHttpClient.routeDatabase }}
OkHttpClient client = new OkHttpClient();//同步请求Response response = client.newCall(request).execute();
通过创建完OkHttpClient对象,调用内部的 newCall() 方法,将最终的请求交给RealCall的 execute() 方法,在该方法内部处理
1.确保Call方法只执行一次(有版本区别,请看下文) 2.通知dispatcher进入执行状态 3.通过一系列的拦截器的请求处理和响应处理得到最终的结果 4告诉dispatcher已经执行完毕
/*** Prepares the {@code request} to be executed at some point in the future.*/@Override public Call newCall(Request request) { return RealCall.newRealCall(this, request, false /* for web socket */);}// RealCall为真正的请求执行者static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { // Safely publish the Call instance to the EventListener. RealCall call = new RealCall(client, originalRequest, forWebSocket); call.eventListener = client.eventListenerFactory().create(call); return call;}
private boolean executed;@Override public Response execute() throws IOException { synchronized (this) { // 每个Call只能执行一次 if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); timeout.enter(); eventListener.callStart(this); try { // 通知dispatcher已经进入执行状态 client.dispatcher().executed(this); // 通过一系列的拦截器请求处理和响应处理得到最终的返回结果 Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } catch (IOException e) { e = timeoutExit(e); eventListener.callFailed(this, e); throw e; } finally { // 通知 dispatcher 自己已经执行完毕 client.dispatcher().finished(this); }}
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); // 在配置 OkHttpClient 时设置的 interceptors; interceptors.addAll(client.interceptors()); // 负责失败重试以及重定向 interceptors.add(retryAndFollowUpInterceptor); // 请求时,对必要的Header进行一些添加,接收响应时,移除必要的Header interceptors.add(new BridgeInterceptor(client.cookieJar())); // 负责读取缓存直接返回、更新缓存 interceptors.add(new CacheInterceptor(client.internalCache())); // 负责和服务器建立连接 interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { // 配置 OkHttpClient 时设置的 networkInterceptors interceptors.addAll(client.networkInterceptors()); } // 负责向服务器发送请求数据、从服务器读取响应数据 interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); // 使用责任链模式开启链式调用 return chain.proceed(originalRequest);}// StreamAllocation 对象,它相当于一个管理类,维护了服务器连接、并发流// 和请求之间的关系,该类还会初始化一个 Socket 连接对象,获取输入/输出流对象。public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { ... // Call the next interceptor in the chain. // 实例化下一个拦截器对应的RealIterceptorChain对象 RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout); // 得到当前的拦截器 Interceptor interceptor = interceptors.get(index); // 调用当前拦截器的intercept()方法,并将下一个拦截器的RealIterceptorChain对象传递下去,最后得到响应 Response response = interceptor.intercept(next); ... return response;
//newcall真正的执行在RealCall类override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
RealCall.kt
private val executed = AtomicBoolean()override fun execute(): Response { check(executed.compareAndSet(false, true)) { "Already Executed" } timeout.enter() callStart() try { client.dispatcher.executed(this) return getResponseWithInterceptorChain() } finally { client.dispatcher.finished(this) } }
@Throws(IOException::class) internal fun getResponseWithInterceptorChain(): Response { // Build a full stack of interceptors. val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis ) var calledNoMoreExchanges = false try { val response = chain.proceed(originalRequest) if (isCanceled()) { response.closeQuietly() throw IOException("Canceled") } return response } catch (e: IOException) { calledNoMoreExchanges = true throw noMoreExchanges(e) as Throwable } finally { if (!calledNoMoreExchanges) { noMoreExchanges(null) } } }
在Java版本中通过使用 synchronized 关键字来保证保证线程安全,并且确保executed只会被执行一次
kotlin版本中直接移除了 synchronized 关键字,并且将executed字段设置为具有原子性特征的boolean值,且通过CAS操作去确保是否已经执行了
OkHttpClient client = new OkHttpClient();Request request = new Request.Builder() .url(url) .build();client.newCall(request).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { }
public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); eventListener.callStart(this); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
从上面看到,通过加锁后确保只会执行一次后,将当前的请求交给dispatcher拦截器中的enqueue()方法执行,那么我们来看看这个代码
Dispatcher.java
void enqueue(AsyncCall call) { synchronized (this) { readyAsyncCalls.add(call); if (!call.get().forWebSocket) { AsyncCall existingCall = findExistingCallWithHost(call.host()); if (existingCall != null) call.reuseCallsPerHostFrom(existingCall); } } promoteAndExecute(); }
在拦截器中的方法中,发现它首先将这个请求添加到了readyAsyncCalls这个队列中,你问我怎么就知道它是队列了?你还记得上面我说的吗?👆👆👆
添加到队列后接着执行**promoteAndExecute()**方法
private boolean promoteAndExecute() { assert (!Thread.holdsLock(this)); List<AsyncCall> executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); if (runningAsyncCalls.size() >= maxRequests) break; //判断是否大于最大的请求数 if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // 判断主机数是否已经达到最大. // 如果其中的runningAsynCalls不满,且call占用的host小于最大数量,则将call加入到runningAsyncCalls中执行, //利用线程池执行call否者将call加入到readyAsyncCalls中。 i.remove(); asyncCall.callsPerHost().incrementAndGet(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; }
call
加入到线程池中执行了。现在再看AsynCall的
代码,它是RealCall
中的内部类
final class AsyncCall extends NamedRunnable { private final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; } String host() { return originalRequest.url().host(); } Request request() { return originalRequest; } RealCall get() { return RealCall.this; } /** * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ void executeOn(ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false; try { executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); eventListener.callFailed(RealCall.this, ioException); responseCallback.onFailure(RealCall.this, ioException); } finally { if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } } @Override protected void execute() { boolean signalledCallback = false; timeout.enter(); try { // 跟同步执行一样,最后都会调用到这里 Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { e = timeoutExit(e); if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }}
override fun enqueue(responseCallback: Callback) { check(executed.compareAndSet(false, true)) { "Already Executed" } callStart() client.dispatcher.enqueue(AsyncCall(responseCallback)) }
internal fun enqueue(call: AsyncCall) { synchronized(this) { readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute() }
private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf<AsyncCall>() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 } for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning }
inner class AsyncCall( private val responseCallback: Callback ) : Runnable { @Volatile var callsPerHost = AtomicInteger(0) private set fun reuseCallsPerHostFrom(other: AsyncCall) { this.callsPerHost = other.callsPerHost } val host: String get() = originalRequest.url.host val request: Request get() = originalRequest val call: RealCall get() = this@RealCall /** * Attempt to enqueue this async call on [executorService]. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ fun executeOn(executorService: ExecutorService) { client.dispatcher.assertThreadDoesntHoldLock() var success = false try { executorService.execute(this) success = true } catch (e: RejectedExecutionException) { val ioException = InterruptedIOException("executor rejected") ioException.initCause(e) noMoreExchanges(ioException) responseCallback.onFailure(this@RealCall, ioException) } finally { if (!success) { client.dispatcher.finished(this) // This call is no longer running! } } } override fun run() { threadName("OkHttp ${redactedUrl()}") { var signalledCallback = false timeout.enter() try { val response = getResponseWithInterceptorChain() signalledCallback = true responseCallback.onResponse(this@RealCall, response) } catch (e: IOException) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e) } else { responseCallback.onFailure(this@RealCall, e) } } catch (t: Throwable) { cancel() if (!signalledCallback) { val canceledException = IOException("canceled due to $t") canceledException.addSuppressed(t) responseCallback.onFailure(this@RealCall, canceledException) } throw t } finally { client.dispatcher.finished(this) } } } }
AysncCall
中的execute()
中的方法,同样是通过Response response = getResponseWithInterceptorChain();
来获得response,这样异步任务也同样通过了
首先,请求的时候初始化一个Call的实例,然后执行它的**execute()方法或enqueue()方法,内部最后都会执行到getResponseWithInterceptorChain()**方法,
这个方法通过拦截器组成的责任链模式,依次经过用户自定义普通拦截器、重试拦截器(RetryAndFollowUpInterceptor)、桥接拦截器(BridgeInterceptor)、缓存拦截器(BridgeInterceptor)、连接拦截器(CallServerInterceptor)和用户自定义网络拦截器以及访问服务器拦截器等拦截处理过程,最终将获取到的响应结果交给用户
最后上图
image.png
在Okhttp整个过程中,比较重要的两个拦截器,缓存拦截器和连接拦截器,关于缓存拦截器在文一开始的时候就简单的说了下👆👆👆
现在说下另一个比较重要的拦截器
@Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChain) chain; Request request = realChain.request(); StreamAllocation streamAllocation = realChain.streamAllocation(); // We need the network to satisfy this request. Possibly for validating a conditional GET. boolean doExtensiveHealthChecks = !request.method().equals("GET"); // HttpCodec是对 HTTP 协议操作的抽象,有两个实现:Http1Codec和Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。在这个方法的内部实现连接池的复用处理 HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); RealConnection connection = streamAllocation.connection(); return realChain.proceed(request, streamAllocation, httpCodec, connection);}
通过调用了 streamAllocation 的 newStream() 方法的时候,经过一系列判断到达 StreamAllocation类 中的 findConnection() 方法
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { ... // Attempt to use an already-allocated connection. We need to be careful here because our // already-allocated connection may have been restricted from creating new streams. // 尝试使用已分配的连接,已经分配的连接可能已经被限制创建新的流 releasedConnection = this.connection; // 释放当前连接的资源,如果该连接已经被限制创建新的流,就返回一个Socket以关闭连接 toClose = releaseIfNoNewStreams(); if (this.connection != null) { // We had an already-allocated connection and it's good. result = this.connection; releasedConnection = null; } if (!reportedAcquired) { // If the connection was never reported acquired, don't report it as released! // 如果该连接从未被标记为获得,不要标记为发布状态,reportedAcquired 通过 acquire() 方法修改 releasedConnection = null; } if (result == null) { // Attempt to get a connection from the pool. // 尝试供连接池中获取一个连接 Internal.instance.get(connectionPool, address, this, null); if (connection != null) { foundPooledConnection = true; result = connection; } else { selectedRoute = route; } } } // 关闭连接 closeQuietly(toClose); if (releasedConnection != null) { eventListener.connectionReleased(call, releasedConnection); } if (foundPooledConnection) { eventListener.connectionAcquired(call, result); } if (result != null) { // If we found an already-allocated or pooled connection, we're done. // 如果已经从连接池中获取到了一个连接,就将其返回 return result; } // If we need a route selection, make one. This is a blocking operation. boolean newRouteSelection = false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); } synchronized (connectionPool) { if (canceled) throw new IOException("Canceled"); if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. // 根据一系列的 IP地址从连接池中获取一个链接 List<Route> routes = routeSelection.getAll(); for (int i = 0, size = routes.size(); i < size;i++) { Route route = routes.get(i); // 从连接池中获取一个连接 Internal.instance.get(connectionPool, address, this, route); if (connection != null) { foundPooledConnection = true; result = connection; this.route = route; break; } } } if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. // 在连接池中如果没有该连接,则创建一个新的连接,并将其分配,这样我们就可以在握手之前进行终端 route = selectedRoute; refusedStreamCount = 0; result = new RealConnection(connectionPool, selectedRoute); acquire(result, false); } } // If we found a pooled connection on the 2nd time around, we're done. if (foundPooledConnection) { // 如果我们在第二次的时候发现了一个池连接,那么我们就将其返回 eventListener.connectionAcquired(call, result); return result; } // Do TCP + TLS handshakes. This is a blocking operation. // 进行 TCP 和 TLS 握手 result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); routeDatabase().connected(result.route()); Socket socket = null; synchronized (connectionPool) { reportedAcquired = true; // Pool the connection. // 将该连接放进连接池中 Internal.instance.put(connectionPool, result); // If another multiplexed connection to the same address was created concurrently, then // release this connection and acquire that one. // 如果同时创建了另一个到同一地址的多路复用连接,释放这个连接并获取那个连接 if (result.isMultiplexed()) { socket = Internal.instance.deduplicate(connectionPool, address, this); result = connection; } } closeQuietly(socket); eventListener.connectionAcquired(call, result); return result;}
通过上面的源码也可以证明我们开头的结论是正确的,添加连接池里面去了,简单来说,连接复用省去了TCP和TLS握手的过程,因为建立连接本身也是需要消耗时间的,连接复用后就可以提升网络访问效率
最后说下ConnectionPool的作用
public final class ConnectionPool {private final Deque<RealConnection> connections = new ArrayDeque<>(); //...... void put(RealConnection connection) { assert (Thread.holdsLock(this)); if (!cleanupRunning) { cleanupRunning = true; executor.execute(cleanupRunnable); } connections.add(connection); } private final Runnable cleanupRunnable = () -> { while (true) { long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } };}
创建一个新的连接的时候,一方面需要它放进缓存里面另一边,另一方面对缓存进行清理。在 ConnectionPool 中,当我们向连接池中缓存一个连接的时候,只要调用双端队列的 add() 方法,将其加入到双端队列即可,而清理连接缓存的操作则交给线程池来定时执行
object ConnectInterceptor : Interceptor { @Throws(IOException::class) override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.call.initExchange(chain) val connectedChain = realChain.copy(exchange = exchange) return connectedChain.proceed(realChain.request) }}
1.为什么在Java版本的时候用锁,kotlin中用的是带有原子性的属性值并且是通过CAS操作呢?
2.为什么要用队列的形式存储数据?用链表可以吗?
3.拦截器是怎么工作的,怎么进行传递和响应数据的?
4.如何自定义拦截器?怎么添加配置?