Spark中的顶层RPC环境架构及总结
RpcEndpoint
RpcEndpoint
是对能够处理RPC请求,给某一特定服务提供本地调用及跨节点调用的RPC组件的抽象,所有运行于RPC框架之上的实体都应该继承RpcEndpoint
,
具体构成比较简单,receive()
方法用于处理RpcEndpointRef.send()
和 RpcCallContext.reply()
发送的消息,receiveAndReply()
则是用于RpcEndpointRef.ask()
发送的消息。其他主要是提供了一些待实现的回调函数
- onError()
- onConnected()
- onDisconnected()
- onNetworkError()
- onStart()
- onStop()
无论是receive()
还是receiveAndReply()
都是返回一个偏函数,在方法体中用模式匹配来将消息用不同的回调函数处理
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
RpcEndpoint
一般通过匿名内部类和已经实现好的子类来使用,下图是一些子类,其中DummyMaster
用于测试。
ThreadSafeRpcEndpoint
是继承自RpcEndpoint
的特质,主要用于对消息的处理,必须是线程安全的场景。ThreadSafeRpcEndpoint
对消息的处理都是串行的,即前一条消息处理完才能接着处理下一条消息,如下图所示,其中TestRpcEndPoint
用于测试。
RpcEndpointRef
远程RpcEndpoint
引用,用于消息发送方持有并发送消息,如下图所示。
对于特质RpcEndpointRef
来说,有以下属性用来保证消息投递的一致性,at-most-once; at-least-once; exactly-once
maxRetries
:最大尝试连接次数。可以通过spark.rpc.numRetries
参数指定,默认3次retryWaitMs
:每次尝试连接最大等待毫秒值。可以通过spark.rpc.retry.wait
,默认3秒defaultAskTimeout
:RPC ask操作的超时时间。可以通过spark.rpc.askTimeout
,默认120秒
RpcEndpointRef
中还定义了一些未实现的方法,用于发送消息
send()
:发送单向异步的消息,发完即忘语义ask()
:**发送消息到相应的RpcEndpoint.receiveAndReply()
, 并返回Future
**以在默认超时或者自定义超时内接收返回值askSync()
:与ask()
类似,不过askSync()
直接返回接收消息并且是阻塞的方法
RpcEndpointRef
存在唯一实现类 NettyRpcEndpointRef
,重写的ask()
和send()
,首先将message封装为RequestMessage
然后再调用NettyRpcEnv
的ask()
和send()
方法。
RpcEnv
表示RPC环境,只有唯一子类NettyRpcEnv
。 包含以下重要成员对象
transportContext
:TransportContext
streamManager
:NettyStreamManager
dispatcher
:Dispatcher
server
:TransportServer
clientFactory
: 用于构造发送和接收响应的TransportClient
。private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
fileDownloadFactory
: 用于下载文件的TransportClient
。@volatile private var fileDownloadFactory: TransportClientFactory = _
outboxes
: 远端RPC地址与Outbox
的映射关系,使用ConcurrentHashMap
保证线程安全
包含以下重要方法
startServer()
: 通过TransportContext
的createServer()
方法创建TransportServer
。向Dispatcher
注册RpcEndpointVerifier
。RpcEndpointVerifier
用于校验指定名称的RpcEndpoint
是否存在。RpcEndpointVerifier
在Dispatcher
中的注册名为endpoint-verifierdef startServer(bindAddress: String, port: Int): Unit = { val bootstraps: java.util.List[TransportServerBootstrap] = if (securityManager.isAuthenticationEnabled()) { java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager)) } else { java.util.Collections.emptyList() } server = transportContext.createServer(bindAddress, port, bootstraps) dispatcher.registerRpcEndpoint( RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) }
ask()
: 询问- 如果请求消息的接收者的地址与当前
NettyRpcEnv
的地址相同,将消息交给dispatcher
的postLocalMessage()
方法处理,并传入成功和失败时的回调函数 - 如果请求消息的接收者的地址与当前
NettyRpcEnv
的地址不同,则将消息序列化与回调函数一起封装为RpcOutboxMessage
放入outbox
,将消息投递出去 - 设定定时器,并返回请求结果
private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = { val promise = Promise[Any]() val remoteAddr = message.receiver.address def onFailure(e: Throwable): Unit = { if (!promise.tryFailure(e)) { e match { case e : RpcEnvStoppedException => logDebug (s"Ignored failure: $e") case _ => logWarning(s"Ignored failure: $e") } } } def onSuccess(reply: Any): Unit = reply match { case RpcFailure(e) => onFailure(e) case rpcReply => if (!promise.trySuccess(rpcReply)) { logWarning(s"Ignored message: $reply") } } try { if (remoteAddr == address) { val p = Promise[Any]() p.future.onComplete { case Success(response) => onSuccess(response) case Failure(e) => onFailure(e) }(ThreadUtils.sameThread) dispatcher.postLocalMessage(message, p) } else { val rpcMessage = RpcOutboxMessage(message.serialize(this), onFailure, (client, response) => onSuccess(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) promise.future.failed.foreach { case _: TimeoutException => rpcMessage.onTimeout() case _ => }(ThreadUtils.sameThread) } val timeoutCancelable = timeoutScheduler.schedule(new Runnable { override def run(): Unit = { onFailure(new TimeoutException(s"Cannot receive any reply from ${remoteAddr} " + s"in ${timeout.duration}")) } }, timeout.duration.toNanos, TimeUnit.NANOSECONDS) promise.future.onComplete { v => timeoutCancelable.cancel(true) }(ThreadUtils.sameThread) } catch { case NonFatal(e) => onFailure(e) } promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) }
- 如果请求消息的接收者的地址与当前
send()
: 发送消息。与ask()
类似,都是本地消息交于Inbox
,远程消息交于outbox
private[netty] def send(message: RequestMessage): Unit = { val remoteAddr = message.receiver.address if (remoteAddr == address) { // Message to a local RPC endpoint. try { dispatcher.postOneWayMessage(message) } catch { case e: RpcEnvStoppedException => logDebug(e.getMessage) } } else { // Message to a remote RPC endpoint. postToOutbox(message.receiver, OneWayOutboxMessage(message.serialize(this))) } }
postToOutbox()
: 消息投递到远端结点- 如果
receiver.client
不为空,那么消息将直接通过TransportClient
发送到远端节点 - 如果
receiver.client
为空,则获取远端结点地址对应的Outbox
,若没有则新建一个 - 如果
NettyRpcEnv
已经停止,移除该Outbox
并停止,否则调用Outbox.send()
发送消息
private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = { if (receiver.client != null) { message.sendWith(receiver.client) } else { require(receiver.address != null, "Cannot send message to client endpoint with no listen address.") val targetOutbox = { val outbox = outboxes.get(receiver.address) if (outbox == null) { val newOutbox = new Outbox(this, receiver.address) val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox) if (oldOutbox == null) { newOutbox } else { oldOutbox } } else { outbox } } if (stopped.get) { // It's possible that we put `targetOutbox` after stopping. So we need to clean it. outboxes.remove(receiver.address) targetOutbox.stop() } else { targetOutbox.send(message) } } }
- 如果
总结
如下图所示,客户端发送请求的流程图,左侧右侧分别表示两个不同节点上的NettyRpcEnv
- 通过调用
NettyRpcEndpointRef
的send()
和ask()
方法向本地节点的RpcEndpoint
发送消息。由于是在同一节点,所以直接调用Dispatcher
的postLocalMessage()
或postOneWayMessage()
方法将消息放入EndpointData
内部Inbox
的messages
中。MessageLoop
线程最后处理消息,并将消息发给对应的RpcEndpoint
处理。 - 通过调用
NettyRpcEndpointRef
的send()
和ask()
方法向远端节点的RpcEndpoint
发送消息。消息将首先被封装为OutboxMessage
,然后放入到远端RpcEndpoint
的地址所对应的Outbox
的messages
中。 - 每个
Outbox
的drainOutbox()
方法通过循环,不断从messages
列表中取得OutboxMessage
,并通过TransportClient
发送,底层依赖Netty
。 TransportClient
和远端NettyRpcEnv
的TransportServer
建立了连接后,请求消息首先经过Netty管道的处理,由TransportChannelHandler
将消息分发给TransportRequestHandler
,最终会调用NettyRpcHandler
或StreamManager
处理。如果是RPC消息则会调用NettyRpcHandler.receive()
方法,之后与第一步所述一致,调用Dispatcher
的postRemoteMessage()
或``postOneWayMessage()`方法。- 如果
TransportRequestHandler
处理的是RpcRequest
,那么server端的TransportRequestHandler
处理消息时还会对client端进行响应,依赖Netty
将响应消息发送给client端。client端接收到消息时由TransportChannelHandler
将消息分发给TransportResponseHandler
处理。
REFERENCE
- spark 源码分析
- Spark内核设计的艺术:架构设计与实现
文档信息
- 本文作者:wzx
- 本文链接:https://masterwangzx.com/2020/09/04/RpcEnv/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)