上一篇把 Executor 注册到 Driver 的过程进行了详尽的描述。并且把四次往复的过程用图和代码都做了说明,虽然后面的注册 Executor 的部分没有详细再画图,但是起过程和第一次确认 Driver 端服务的过程大体相同,如有问题可以给我留言我们来互动沟通。
上述过程中,在 Executor 的 client 端是如何构建了 socket,如何发送的请求,这部分细节是本章要探讨的主要内容。
这部分内容,其实是我第一篇 Spark 的源代码文章中就讲过的,但是当时讲的方法有点啰嗦,很多同学看完告诉我还是太硬核了,参考这里。当时还手工的画了一个下图。
所以我考虑再三后,重新把 spark 的内容进行了梳理后,写的现在的大画 spark 系列。本次再讲解的话,主要还是注重画图,减少大幅贴代码。
这个过程中,有两个地方相对优雅一些
send处理来poll出然后进行发送处理。在这个过程中,第一次连接服务端,则会初始化 client,即懒加载,并且,初始化 client 的过程也是一个异步的过程。会有一个专门的线程服务来进行 client 的初始化,当初始化结束后,会持续之前的发送流程,直到整个发送结束。如上图所示,整个过程可以看作是 1-16 个过程。我举的例子是第一次连接 Driver 的场合,所以这 16 个步骤都会走到,而连接一次以后,后续因为 client 已经存在,初始化 client 的操作就会被省略。
CoarseGrainedExecutorBackend的onStart方法,这个在上一篇中有介绍NettyRpcEnv的asyncSetupEndpointRefByURI方法EndpointRef进行调用,所有的EndpointRef都是NettyRpcEndpointRef的实例,只是传入的Endpoint的 name 不同,通过这个 name 会在目标的 server 去匹配到底哪一个Endpoint的实例解析相应的请求NettyRpcEndpointRef中会调用回NettyRpcEnv的ask方法postToOutbox的方法,这个方法通过名字也可以看出,其引出了一个新的结构体,就是OutBox,所有从 client 端发出去的消息都是通过这个OutBox发出去的send的流程开始的drainOutbox的方法内,这个方法顾名思义,就是要开始对Outbox中的消息进行发送操作OutBox中持有的 client 变量是否为null,这个 client 变量即TransportClient的实例,当第一次进入到这里的时候,client 一定是 null,所以判断 is null 一定是 yes,所以会走到第 9 步nettyEnv.*clientConnectionExecutor*.submit去启动一个线程 task 来去做 client 的初始化,而原来的线程的操作会被 returnNettyRpcEnv的createClient方法,从而可以初始化出一个 client,也就是TransportClient的实例drainOutbox方法,即和步骤 7 调用的一致RpcOutboxMessage的sendWith的方法TransportClient的sendRpc方法,通过这个方法, 可以利用 Netty 的 Channel 把消息发送出去,并且保存发总的requestId,方便收到response的时候判断是之前哪一个 reqeust 的消息,从而回掉发送消息时缓存的callback方法,从而完成一次一来一回的发送接收过程通过以上的过程,Executor 就建立起了和 Driver 端的连接,通过这个连接,后续可以继续通过上述的 4 的ask方法来发送RpcRequestMessage,如果是发送的OneWayMessage,则使用send方法。从英文中的 ask 和 send 也可以分辨出来,ask 是询问,需要 answer,而 send 只是发送,不需要回答,所以从方法名也可以看出是否需要 response。具体RpcRequestMessage与OneWayMessage的区别参考这里
在 spark 中的 client,具有双重含义。
并不是只在类似于 Executor 的 Client 端存在,而是
TransportClient
TransportClient
上面要如何理解呢?首先要明确以下几个逻辑概念
TransportClient,并不是一个只存在于物理意义上的 client 的 java 的 class,而是,在物理上 client 与 server 中都存在,目的是握取操作系统底层的对外联通的channel,通过TransportClient找到channel,进而向外发送出消息TransportClient并没有被初始化,而是在我上一节讲解当中描述的,会有一个 lazy 实例化的过程TransportClient的实例化是根据每连上一个物理 client 而动态创建出来的。这里需要一些 Netty 与底层网络的基础知识了,我们不去深究,暂时记住这一点即可,可以参考下面的图来理解首先,通过这个图,我们先 High level 的理解一下数据传输的过程,以及 channel 的构建过程
TransportClient构建出自己的 channel 联通 server 端知道了以上的知识基础后,我们会发现,在 client 与 server 都存在 channel,在 client 端 channel 是属于TransportClient的,在 server 端是如何操作的呢?
答案是,在 server 端也是TransportClient 持有 channel,这里,spark 做的还是比较优雅,它的TransportClient是公共的,无论是 client 还是 server 都有TransportClient,其中都有 channel,它的逻辑含义都是通过TransportClient所持有的 channel 可以相互通信,而这个 channel 和TransportClient又是如何配合整体的架构体系所存在的呢,继续画图
**TransportClient与 channel 是如何在大框架中存在的**
**TransportClient**来持有 channel 来给对方发送消息,主要注意的是,消息并不是只有 client 端发送给 server 端,一旦 connection 联通后,双方是可以对等的给对方发送消息的,这是网路底层的基础原理TransportChannelHandler来进行第一步的分发处理再借助一个以前的图来加深印象,在 driver 代表的 Physical server 与 Executor 代表的 Physical client 的构建与结构,可以看到,双方其实除了在底层网络的 server 层面有差异,其实其余部分大体相同
executor发送消息出来到driver通过executor的TransportClient把消息放入channel,发送出来driver内的流转和转换,最终来到处理消息的Endpoint
driver发送回client,通过载driver中掌握的executor的EndpointRef的TransportClient把消息放入channel,从而发给executor
driver到底是如何获取到的executor的EndpointRef的,或者说,最终是通过TransportClient发送的response回去,这个TransportClient与channel是如何与EndpointRef集成到一起的呢?下一篇会详尽描述DriverEndpoint主动发出调用了持有的executor的EndpointRef
EndpointRef的TransportClient把消息放入channel,从而发给executor,executor接到后调用相应的callback处理本篇其实和上一篇差不多,是上一篇的概括版,把TransportClient的逻辑细节又做了详尽的阐述。很多开发同学都会把自己陷入到网络就是 http 协议的怪圈中去,其实底层的网络 client 和 server 一旦联通之后,双方的逻辑是相同的,你可以把它比做是 websocket 协议,client→server 与 server→client 是对等的。基于此,在 spark 中,executor(client 端)主动向 driver(server 端)发送注册申请,注册之后,driver 获取了 executor 的连接(TransportClient 与 channel),才可能通过 driver(server 端)来下发任务给 executor(client 端),这块的细节,下一篇继续硬核攻击。