前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划10 | NettyRpcEnv客户端消息发送逻辑

Spark Core源码精读计划10 | NettyRpcEnv客户端消息发送逻辑

作者头像
大数据真好玩
发布2019-08-08 15:49:39
9160
发布2019-08-08 15:49:39
举报
文章被收录于专栏:暴走大数据

前言

在上一篇文章中,我们了解了NettyRpcEnv内的调度器Dispatcher的内部细节。Dispatcher涉及到的主要是消息接收、路由与处理的机制,也就是NettyRpcEnv作为服务端应该具备的功能。既然它的名字叫“RPC环境”,那么就应该既能接收,也能发送消息。本文就主要来看一看NettyRpcEnv作为客户端向远端端点发送消息的逻辑。

NettyRpcEnv与消息发送相关的成员

这些成员有些在代码#8.5中出现过,但当时只讲了几个基础的含义,并没有细说。下面再详细列举一次。

代码#10.1 - NettyRpcEnv与消息发送相关的成员

代码语言:javascript
复制
  private def createClientBootstraps(): java.util.List[TransportClientBootstrap] = {
    if (securityManager.isAuthenticationEnabled()) {
      java.util.Arrays.asList(new AuthClientBootstrap(transportConf,
        securityManager.getSaslUser(), securityManager))
    } else {
      java.util.Collections.emptyList[TransportClientBootstrap]
    }
  }

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  @volatile private var fileDownloadFactory: TransportClientFactory = _

  val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")

  private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
    "netty-rpc-connection",
    conf.getInt("spark.rpc.connect.threads", 64))

  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
clientFactory、fileDownloadFactory

这两个成员的类型是TransportClientFactory,通过传输上下文TransportContext的createClientFactory()方法创建。这个工厂类在NettyRpcEnv里用于生产TransportClient,即RPC客户端。

clientFactory用来处理一般性的请求发送和应答接收,后面的分析中主要用到它。而fileDownloadFactory专门用于下载文件,所以它不会立即初始化,而是按需创建。

timeoutScheduler

它的类型是ScheduledThreadPoolExecutor,即Java中的定时线程池。它通过ThreadUtils工具类中的对应方法创建,且默认只有一条守护线程。它用来专门处理RPC请求超时。

clientConnectionExecutor

它的类型是ThreadPoolExecutor,实际上是一个缓冲的守护线程池。来看看ThreadUtils中创建它的方法,顺便复习一下线程池的七大参数吧。在读源码的过程中随时温习基础知识十分有益。

代码#10.2 - o.a.s.util.ThreadUtils.newDaemonCachedThreadPool()方法

代码语言:javascript
复制
  def newDaemonCachedThreadPool(
      prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
    val threadFactory = namedThreadFactory(prefix)
    val threadPool = new ThreadPoolExecutor(
      maxThreadNumber,                    // corePoolSize
      maxThreadNumber,                    // maximumPoolSize
      keepAliveSeconds,                   // keepAliveTime
      TimeUnit.SECONDS,                   // timeUnit
      new LinkedBlockingQueue[Runnable],  // workQueue
      threadFactory)                      // threadFactory
                                          // rejectedExecutionHandler (default)
    threadPool.allowCoreThreadTimeOut(true)
    threadPool
  }

这个线程池专门来处理TransportClient的创建,因为TransportClientFactory.createClient()方法本身是一个阻塞调用,因此必须用线程池来异步处理它。线程池大小可以用配置项spark.rpc.connect.threads调节,默认为64。

outboxes

还记得文章#9中的“收件箱”Inbox么?这里该出现与其对应的“发件箱”Outbox了。outboxes维护有远端RPC地址与各个发件箱的映射,需要发送的消息首先会放入Outbox中,再进行处理。所有的消息都继承自OutboxMessage特征。

下面我们就以Outbox为起点探索NettyRpcEnv中消息的发送。

发件箱Outbox相关逻辑

OutboxMessage

OutboxMessage特征非常简单,只声明了两个方法:sendWith()和onFailure()。它也只有两个实现类,分别是无需应答的消息OneWayOutboxMessage和需要应答的消息RpcOutboxMessage。以RpcOutboxMessage为例,其代码如下,比较容易理解,就不多废话了。

代码#10.3 - o.a.s.rpc.netty.RpcOutboxMessage类

代码语言:javascript
复制
private[netty] case class RpcOutboxMessage(
    content: ByteBuffer,
    _onFailure: (Throwable) => Unit,
    _onSuccess: (TransportClient, ByteBuffer) => Unit)
  extends OutboxMessage with RpcResponseCallback with Logging {
  private var client: TransportClient = _
  private var requestId: Long = _

  override def sendWith(client: TransportClient): Unit = {
    this.client = client
    this.requestId = client.sendRpc(content, this)
  }

  def onTimeout(): Unit = {
    if (client != null) {
      client.removeRpcRequest(requestId)
    } else {
      logError("Ask timeout before connecting successfully")
    }
  }

  override def onFailure(e: Throwable): Unit = {
    _onFailure(e)
  }

  override def onSuccess(response: ByteBuffer): Unit = {
    _onSuccess(client, response)
  }
}
消息处理

Outbox.send()方法用于真正发送消息,其代码如下。

代码#10.4 - o.a.s.rpc.netty.Outbox.send()方法

代码语言:javascript
复制
  @GuardedBy("this")
  private val messages = new java.util.LinkedList[OutboxMessage]

  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }

其中,messages与Inbox中相同,是一个普通的链表,所以要用synchronized保证同步。如果Outbox不是停止状态的话,就将OutboxMessage添加到链表中,然后调用drainOutbox()方法处理消息。

代码#10.4 - o.a.s.rpc.netty.Outbox.drainOutbox()方法

代码语言:javascript
复制
  @GuardedBy("this")
  private var connectFuture: java.util.concurrent.Future[Unit] = null
  @GuardedBy("this")
  private var draining = false

  private def drainOutbox(): Unit = {
    var message: OutboxMessage = null
    synchronized {
      if (stopped) {
        return
      }
      if (connectFuture != null) {
        return
      }
      if (client == null) {
        launchConnectTask()
        return
      }
      if (draining) {
        return
      }
      message = messages.poll()
      if (message == null) {
        return
      }
      draining = true
    }
    while (true) {
      try {
        val _client = synchronized { client }
        if (_client != null) {
          message.sendWith(_client)
        } else {
          assert(stopped == true)
        }
      } catch {
        case NonFatal(e) =>
          handleNetworkFailure(e)
          return
      }
      synchronized {
        if (stopped) {
          return
        }
        message = messages.poll()
        if (message == null) {
          draining = false
          return
        }
      }
    }
  }

从这段代码可以看出,当Outbox遇到以下三种情况之一,则不处理消息,直接返回:

  • Outbox已经停止,或者仍然在连接远端的RPC端点;
  • TransportClient本身为空,说明还没有创建RPC客户端,此时应该先创建它;
  • 有其他线程已经在处理消息了。

如果没有异常情况的话,就从messages表中取出消息,将标志draining设为true,并调用OutboxMessage.sendWith()方法发送之。来看看创建RPC客户端的方法launchConnectTask()。

代码#10.5 - o.a.s.rpc.netty.Outbox.launchConnectTask()方法

代码语言:javascript
复制
  private def launchConnectTask(): Unit = {
    connectFuture = nettyEnv.clientConnectionExecutor.submit(new Callable[Unit] {
      override def call(): Unit = {
        try {
          val _client = nettyEnv.createClient(address)
          outbox.synchronized {
            client = _client
            if (stopped) {
              closeClient()
            }
          }
        } catch {
          case ie: InterruptedException =>
            return
          case NonFatal(e) =>
            outbox.synchronized { connectFuture = null }
            handleNetworkFailure(e)
            return
        }
        outbox.synchronized { connectFuture = null }
        drainOutbox()
      }
    })
  }

这个方法中用到了上述clientConnectionExecutor线程池来提交一个Callable,其内部会最终调用clientFactory.createClient()方法来创建RPC客户端。创建成功之后,再次调用drainOutbox()方法试图处理消息。

向Outbox投递消息

向Outbox投递消息的逻辑位于NettyRpcEnv.postToOutbox()方法中。

代码#10.6 - o.a.s.rpc.netty.NettyRpcEnv.postToOutbox()方法

代码语言:javascript
复制
  private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
    if (receiver.client != null) {
      message.sendWith(receiver.client)
    } else {
      require(receiver.address != null,
        "Cannot send message to client endpoint with no listen address.")
      val targetOutbox = {
        val outbox = outboxes.get(receiver.address)
        if (outbox == null) {
          val newOutbox = new Outbox(this, receiver.address)
          val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
          if (oldOutbox == null) {
            newOutbox
          } else {
            oldOutbox
          }
        } else {
          outbox
        }
      }
      if (stopped.get) {
        outboxes.remove(receiver.address)
        targetOutbox.stop()
      } else {
        targetOutbox.send(message)
      }
    }
  }

由此可见,如果已经持有了远端RPC端点引用对应的TransportClient,就直接调用OutboxMessage.sendWith()方法来发送。但如果没有持有TransportClient的话,就先从outboxes缓存中获取RPC地址对应的发件箱,如果也没有发件箱,就要创建一个出来。最后,在当前NettyRpcEnv和Outbox本身都未停止的前提下,调用send()方法发送消息。

NettyRpcEnv发送消息的方法

ask()方法

ask()方法的作用在文章#8中讲过,即“异步发送一条消息,并在规定的超时时间内等待RPC端点的回复”。其实现方法如下。

代码#10.7 - o.a.s.rpc.netty.NettyRpcEnv.ask()方法

代码语言:javascript
复制
  private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
    val promise = Promise[Any]()
    val remoteAddr = message.receiver.address

    def onFailure(e: Throwable): Unit = {
      if (!promise.tryFailure(e)) {
        e match {
          case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
          case _ => logWarning(s"Ignored failure: $e")
        }
      }
    }

    def onSuccess(reply: Any): Unit = reply match {
      case RpcFailure(e) => onFailure(e)
      case rpcReply =>
        if (!promise.trySuccess(rpcReply)) {
          logWarning(s"Ignored message: $reply")
        }
    }

    try {
      if (remoteAddr == address) {
        val p = Promise[Any]()
        p.future.onComplete {
          case Success(response) => onSuccess(response)
          case Failure(e) => onFailure(e)
        }(ThreadUtils.sameThread)
        dispatcher.postLocalMessage(message, p)
      } else {
        val rpcMessage = RpcOutboxMessage(message.serialize(this),
          onFailure,
          (client, response) => onSuccess(deserialize[Any](client, response)))
        postToOutbox(message.receiver, rpcMessage)
        promise.future.failed.foreach {
          case _: TimeoutException => rpcMessage.onTimeout()
          case _ =>
        }(ThreadUtils.sameThread)
      }

      val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
        override def run(): Unit = {
          onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
            s"in ${timeout.duration}"))
        }
      }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
      promise.future.onComplete { v =>
        timeoutCancelable.cancel(true)
      }(ThreadUtils.sameThread)
    } catch {
      case NonFatal(e) =>
        onFailure(e)
    }
    promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
  }

可见,ask()方法的执行分为两种情况:

  • 如果远端地址与当前NettyRpcEnv的地址相同,那么说明处理该消息的RPC端点就位于本地。这时就新建一个Promise对象,将其Future设置为回调方法(即onSuccess()和onFailure()方法),然后调用本地调度器的postLocalMessage()方法,将消息发送给本地RPC端点。
  • 如果远端地址与当前NettyRpcEnv的地址不同,那么说明处理该消息的RPC端点位于其他节点上。这时会将消息序列化,将它与onSuccess()、onFailure()方法逻辑一同封装到RpcOutboxMessage中投递出去。

最后,用前述timeoutScheduler设置一个定时线程,用来控制超时。超时后会抛出TimeoutException,如果没有超时,就调用cancel()方法取消计时。

send()方法

send()方法的作用则是“同步发送一条单向的消息,并且‘发送即忘记’(fire-and-forget),不需要回复”。其实现方法如下。

代码#10.7 - o.a.s.rpc.netty.NettyRpcEnv.send()方法

代码语言:javascript
复制
  private[netty] def send(message: RequestMessage): Unit = {
    val remoteAddr = message.receiver.address
    if (remoteAddr == address) {
      try {
        dispatcher.postOneWayMessage(message)
      } catch {
        case e: RpcEnvStoppedException => logDebug(e.getMessage)
      }
    } else {
      postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
    }
  }

这个方法的逻辑与ask()方法大致相同,也分为两种情况,只是细节有差别,不再赘述。

总结

本文通过研究NettyRpcEnv内与消息发送相关的逻辑,以及发件箱Outbox的消息处理逻辑,大致讲清了NettyRpcEnv作为RPC客户端的能力。

— THE END —

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • NettyRpcEnv与消息发送相关的成员
    • clientFactory、fileDownloadFactory
      • timeoutScheduler
        • clientConnectionExecutor
          • outboxes
          • 发件箱Outbox相关逻辑
            • OutboxMessage
              • 消息处理
                • 向Outbox投递消息
                • NettyRpcEnv发送消息的方法
                  • ask()方法
                    • send()方法
                    • 总结
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档