前面我们有讲解Kafka的网络通信模型 , 但是那篇文章主要讲的是 作为服务端是如何处理的。
那么,今天我们再来讲一讲 客户端是如何发起请求的。
带着几个问题思考一下
客户端发起请求的几个关键类
该类继承自ByteBufferSend
, 超类是 Send,有以下几个接口
String destination();
boolean completed();
long writeTo(GatheringByteChannel channel) throws IOException;
long size();
它的作用主要是用来缓存待发送的数据的, writeTo
方法会把缓存的数据写入到入参的通道里面。
例如ByteBufferSend
,的写入方法如下。
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
Send接口,还有很多其他的实现类。
客户端的工具类, 只要构建好了NetworkClient,就可以用这个工具类发送请求。
用于异步请求/响应网络 i/o 的网络客户端。这是一个内部类,用于实现面向用户的生产者和消费者客户端。 这个类不是线程安全的!
NetworkClient的一些关键属性
/* 用于执行网络 io 的选择器 */
private final Selectable selector;
/* Metadata元信息的更新器, 他可以尝试更新元信息 */
private final MetadataUpdater metadataUpdater;
/* 每个节点的连接状态 */
private final ClusterConnectionStates connectionStates;
/* 当前正在发送或等待响应的一组请求 */
private final InFlightRequests inFlightRequests;
/* 套接字发送缓冲区大小(以字节为单位) */
private final int socketSendBuffer;
/* 套接字接收大小缓冲区(以字节为单位) */
private final int socketReceiveBuffer;
/* 用于在对服务器的请求中识别此客户端的客户端 ID */
private final String clientId;
/* 向服务器发送请求时使用的当前关联 ID*/
private int correlation;
/* 单个请求等待服务器确认的默认超时*/
private final int defaultRequestTimeoutMs;
//.... 省略
这里构建NetworkClient涉及到的Broker配置有:
属性 | 描述 | 默认 |
---|---|---|
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败。这应该大于replica.lag.time.max.ms(代理配置),以减少由于不必要的生产者重试而导致消息重复的可能性。 | 30000(30 秒) |
socket.connection.setup.timeout.ms | 客户端等待套接字连接建立的时间。如果在超时之前没有建立连接,客户端将关闭套接字通道。 | 10000(10 秒) |
socket.connection.setup.timeout.max.ms | 客户端等待建立套接字连接的最长时间。对于每个连续的连接失败,连接设置超时将成倍增加,直至达到此最大值。为避免连接风暴,将对超时应用 0.2 的随机化因子,从而产生低于计算值 20% 到高于 20% 的随机范围。 | 127000(127 秒) |
请看
// 根据拿到的BrokerNode,和RequestBuilder构建 Request请求
val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
time.milliseconds(), true)
// 发起请求并接受Response
clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
主要的发送请求逻辑就是上面的关键代码, 先构建clientRequest请求,然后用NetworkClientUtils发送请求。
具体代码就不贴出来了, 简要概述一下整个流程吧
maxInFlightRequestsPerConnection
(最大未完成请求数,这个是上层参数决定的) 。当然,如果这个请求的类型是内部请求,是不需要这个判断的。Selector.send(send)
开始发送,这个过程其实是注册SelectionKey.OP_WRITE 事件。当然在这之前会将请求保存起来放到inFlightRequests中,用于后面判断请求数是否超过阈值等等。networkClient.poll
获取Response, 直到结束。客户端发起请求,总共分为以下几个场景。
关键类 ControllerChannelManager
Controller会向Broker发起一些请求,比如UpdateMetadataRequest 更新元信息请求。
在Controller重新选举初始化的时候,或者有新的Broker启动上线之后, Controller节点会执行添加Broker的操作。
ControllerChannelManager#addBroker
private def addNewBroker(broker: Broker): Unit = {
val messageQueue = new LinkedBlockingQueue[QueueItem]
// 获取内部Broker之间通信的监听器名称
val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
// 读取内部Broker之间通信的安全协议
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
// 根据监听器名称选择合适的节点和监听器名称
val brokerNode = broker.node(controllerToBrokerListenerName)
// 省略部分
.......
val networkClient = new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
// 一次只能发一个请求,保证顺序性
1,
0,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
ClientDnsLookup.USE_ALL_DNS_IPS,
time,
false,
new ApiVersions,
logContext
)
// 省略部分
.......
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)
}
PS: 这里传入的maxInFlightRequestsPerConnection
是1,也就说Controller给某个Broker发送请求同一时间只有一个请求。确保请求的顺序性。
在Kafka启动过程中,会构建一个brokerToControllerChannelManager 的实例。这个是专门管理Broker向Controller发起请求的类,里面有一个BrokerToControllerRequestThread线程负责真正的想Controller发起请求。
brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
brokerToControllerChannelManager.start()
线程名格式:自定义前缀:broker-${config.brokerId}-to-controller-send-thread
可以看看他里面的类, 也是先构建networkClient, 然后发起请求。具体构建就不再分析了,跟上面的Controller2Broker一样。但是列出几个重点需要注意的地方:
PS: 这里传入的maxInFlightRequestsPerConnection
也是1,也就说Broker给Controller发送请求同一时间只有一个请求。确保请求的顺序性。
Broker之间的请求, 例如 AbstractFetcherThread 副本同步线程。 Follower去Leader Fetch数据,FetchRequest 请求, 那么他们的通信又是什么样子呢?
基本上都是差不多的, 需要注意几个问题
inter.broker.listener.name
配置去匹配对应的EndPoint。Broker2Broker是属于内部Broker之间的请求。具体的代码在 ReplicaManager#makeFollowers
maxInFlightRequestsPerConnection
也是1,也就说Broker给Controller发送请求同一时间只有一个请求。确保请求的顺序性。这个就是 例如 Producer 和 Consumer 等等向Broker发起请求模块。
方式都是一样的,构建自己的 networkClient,配置不同属性。