Spark源码阅读(五):RPC之RpcEnv、RpcEndpoint、RpcEndpointRef

2020/09/04 Spark 共 6211 字,约 18 分钟

Spark中的顶层RPC环境架构及总结

RpcEndpoint

RpcEndpoint是对能够处理RPC请求,给某一特定服务提供本地调用及跨节点调用的RPC组件的抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint

具体构成比较简单,receive()方法用于处理RpcEndpointRef.send()RpcCallContext.reply()发送的消息,receiveAndReply()则是用于RpcEndpointRef.ask()发送的消息。其他主要是提供了一些待实现的回调函数

  • onError()
  • onConnected()
  • onDisconnected()
  • onNetworkError()
  • onStart()
  • onStop()

无论是receive()还是receiveAndReply()都是返回一个偏函数,在方法体中用模式匹配来将消息用不同的回调函数处理

def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}

RpcEndpoint一般通过匿名内部类和已经实现好的子类来使用,下图是一些子类,其中DummyMaster用于测试。

ThreadSafeRpcEndpoint是继承自RpcEndpoint的特质,主要用于对消息的处理,必须是线程安全的场景。ThreadSafeRpcEndpoint对消息的处理都是串行的,即前一条消息处理完才能接着处理下一条消息,如下图所示,其中TestRpcEndPoint用于测试。

RpcEndpointRef

远程RpcEndpoint引用,用于消息发送方持有并发送消息,如下图所示。

对于特质RpcEndpointRef来说,有以下属性用来保证消息投递的一致性,at-most-once; at-least-once; exactly-once

  • maxRetries最大尝试连接次数。可以通过spark.rpc.numRetries参数指定,默认3次
  • retryWaitMs每次尝试连接最大等待毫秒值。可以通过spark.rpc.retry.wait,默认3秒
  • defaultAskTimeoutRPC ask操作的超时时间。可以通过spark.rpc.askTimeout,默认120秒

RpcEndpointRef中还定义了一些未实现的方法,用于发送消息

  • send()发送单向异步的消息,发完即忘语义
  • ask():**发送消息到相应的 RpcEndpoint.receiveAndReply() , 并返回 Future **以在默认超时或者自定义超时内接收返回值
  • askSync():与ask()类似,不过askSync()直接返回接收消息并且是阻塞的方法

RpcEndpointRef 存在唯一实现类 NettyRpcEndpointRef,重写的ask()send(),首先将message封装为RequestMessage然后再调用NettyRpcEnvask()send()方法。

RpcEnv

表示RPC环境,只有唯一子类NettyRpcEnv。 包含以下重要成员对象

  • transportContext: TransportContext
  • streamManager: NettyStreamManager
  • dispatcher: Dispatcher
  • server: TransportServer
  • clientFactory: 用于构造发送和接收响应的TransportClientprivate val clientFactory = transportContext.createClientFactory(createClientBootstraps())
  • fileDownloadFactory: 用于下载文件的TransportClient@volatile private var fileDownloadFactory: TransportClientFactory = _
  • outboxes: 远端RPC地址与Outbox的映射关系,使用ConcurrentHashMap保证线程安全

包含以下重要方法

  • startServer(): 通过TransportContextcreateServer()方法创建TransportServer。向Dispatcher注册RpcEndpointVerifierRpcEndpointVerifier用于校验指定名称的RpcEndpoint是否存在。RpcEndpointVerifierDispatcher中的注册名为endpoint-verifier

    def startServer(bindAddress: String, port: Int): Unit = {
      val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }
      server = transportContext.createServer(bindAddress, port, bootstraps)
      dispatcher.registerRpcEndpoint(
        RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
    }
    
  • ask(): 询问

    • 如果请求消息的接收者的地址与当前NettyRpcEnv的地址相同,将消息交给dispatcherpostLocalMessage()方法处理,并传入成功和失败时的回调函数
    • 如果请求消息的接收者的地址与当前NettyRpcEnv的地址不同,则将消息序列化与回调函数一起封装为RpcOutboxMessage放入outbox,将消息投递出去
    • 设定定时器,并返回请求结果
    private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
      val promise = Promise[Any]()
      val remoteAddr = message.receiver.address
    
      def onFailure(e: Throwable): Unit = {
        if (!promise.tryFailure(e)) {
          e match {
            case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e")
            case _ => logWarning(s"Ignored failure: $e")
          }
        }
      }
    
      def onSuccess(reply: Any): Unit = reply match {
        case RpcFailure(e) => onFailure(e)
        case rpcReply =>
        if (!promise.trySuccess(rpcReply)) {
          logWarning(s"Ignored message: $reply")
        }
      }
    
      try {
        if (remoteAddr == address) {
          val p = Promise[Any]()
          p.future.onComplete {
            case Success(response) => onSuccess(response)
            case Failure(e) => onFailure(e)
          }(ThreadUtils.sameThread)
          dispatcher.postLocalMessage(message, p)
        } else {
          val rpcMessage = RpcOutboxMessage(message.serialize(this),
                                            onFailure,
                                            (client, response) => onSuccess(deserialize[Any](client, response)))
          postToOutbox(message.receiver, rpcMessage)
          promise.future.failed.foreach {
            case _: TimeoutException => rpcMessage.onTimeout()
            case _ =>
          }(ThreadUtils.sameThread)
        }
    
        val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
          override def run(): Unit = {
            onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " +
                                           s"in ${timeout.duration}"))
          }
        }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
        promise.future.onComplete { v =>
          timeoutCancelable.cancel(true)
        }(ThreadUtils.sameThread)
      } catch {
        case NonFatal(e) =>
        onFailure(e)
      }
      promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
    }
    
  • send(): 发送消息。与ask()类似,都是本地消息交于Inbox,远程消息交于outbox

    private[netty] def send(message: RequestMessage): Unit = {
      val remoteAddr = message.receiver.address
      if (remoteAddr == address) {
        // Message to a local RPC endpoint.
        try {
          dispatcher.postOneWayMessage(message)
        } catch {
          case e: RpcEnvStoppedException => logDebug(e.getMessage)
        }
      } else {
        // Message to a remote RPC endpoint.
        postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this)))
      }
    }
    
  • postToOutbox(): 消息投递到远端结点

    • 如果receiver.client不为空,那么消息将直接通过TransportClient发送到远端节点
    • 如果receiver.client为空,则获取远端结点地址对应的Outbox,若没有则新建一个
    • 如果NettyRpcEnv已经停止,移除该Outbox并停止,否则调用Outbox.send()发送消息
    private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
      if (receiver.client != null) {
        message.sendWith(receiver.client)
      } else {
        require(receiver.address != null,
                "Cannot send message to client endpoint with no listen address.")
        val targetOutbox = {
          val outbox = outboxes.get(receiver.address)
          if (outbox == null) {
            val newOutbox = new Outbox(this, receiver.address)
            val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
            if (oldOutbox == null) {
              newOutbox
            } else {
              oldOutbox
            }
          } else {
            outbox
          }
        }
        if (stopped.get) {
          // It's possible that we put `targetOutbox` after stopping. So we need to clean it.
          outboxes.remove(receiver.address)
          targetOutbox.stop()
        } else {
          targetOutbox.send(message)
        }
      }
    }
    

总结

如下图所示,客户端发送请求的流程图,左侧右侧分别表示两个不同节点上的NettyRpcEnv

  1. 通过调用NettyRpcEndpointRefsend()ask()方法向本地节点的RpcEndpoint发送消息。由于是在同一节点,所以直接调用DispatcherpostLocalMessage()postOneWayMessage()方法将消息放入EndpointData内部Inboxmessages中。MessageLoop线程最后处理消息,并将消息发给对应的RpcEndpoint处理。
  2. 通过调用NettyRpcEndpointRefsend()ask()方法向远端节点的RpcEndpoint发送消息。消息将首先被封装为OutboxMessage,然后放入到远端RpcEndpoint的地址所对应的Outboxmessages中。
  3. 每个OutboxdrainOutbox()方法通过循环,不断从messages列表中取得OutboxMessage,并通过TransportClient发送,底层依赖Netty
  4. TransportClient和远端NettyRpcEnvTransportServer建立了连接后,请求消息首先经过Netty管道的处理,由TransportChannelHandler将消息分发给TransportRequestHandler,最终会调用NettyRpcHandlerStreamManager处理。如果是RPC消息则会调用NettyRpcHandler.receive()方法,之后与第一步所述一致,调用DispatcherpostRemoteMessage()或``postOneWayMessage()`方法。
  5. 如果TransportRequestHandler处理的是RpcRequest,那么server端的TransportRequestHandler处理消息时还会对client端进行响应,依赖Netty将响应消息发送给client端。client端接收到消息时由TransportChannelHandler将消息分发给TransportResponseHandler处理。

REFERENCE

  1. spark 源码分析
  2. Spark内核设计的艺术:架构设计与实现

文档信息

Search

    Table of Contents