Spark RPC中的底层的流数据处理与消息传输
StreamManager
在流中获取单个块,这在TransportRequestHandler
中用于响应fetchChunk()
请求。有两个子类OneForOneStreamManager
和NettyStreamManager
OneForOneStreamManager
为NettyBlockRpcServer
提供了一对一的流服务。ManagedBuffer
是一个不可变的byte数组的抽象。
内部类StreamState
维护了单个流的状态,如下代码所示
appId
:请求流的应用程序idbuffers
:可迭代的ManagedBuffer
,表示当前流的缓冲数据associatedChannel
: 与当前流关联的channelchunksBeingTransferred
: 正在传输的ManagedBuffer
数量curChunk
: 客户端当前接收到的ManagedBuffer
索引,为了确认调用方按顺序且一次只请求一个chunk
private static class StreamState {
final String appId;
final Iterator<ManagedBuffer> buffers;
final Channel associatedChannel;
int curChunk = 0;
volatile long chunksBeingTransferred = 0L;
StreamState(String appId, Iterator<ManagedBuffer> buffers, Channel channel) {
this.appId = appId;
this.buffers = Preconditions.checkNotNull(buffers);
this.associatedChannel = channel;
}
}
OneForOneStreamManager
有以下重要的成员属性
nextStreamId
:下一个stream的id,AtomicLong
保证了并发安全。streams
: 维护了stream id和StreamState
之间的映射关系。ConcurrentHashMap<Long, StreamState>
保证了线程安全。
以下为重要的方法
registerStream()
: 注册一个ManagedBuffers
流和channel。public long registerStream(String appId, Iterator<ManagedBuffer> buffers, Channel channel) { long myStreamId = nextStreamId.getAndIncrement(); streams.put(myStreamId, new StreamState(appId, buffers, channel)); return myStreamId; }
getChunk()
: 获取被封装为ManagedBuffer
的单独块。如果当前流已经到达末尾,就移除这个流。public ManagedBuffer getChunk(long streamId, int chunkIndex) { StreamState state = streams.get(streamId); if (chunkIndex != state.curChunk) { throw new IllegalStateException(String.format( "Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk)); } else if (!state.buffers.hasNext()) { throw new IllegalStateException(String.format( "Requested chunk index beyond end %s", chunkIndex)); } state.curChunk += 1; ManagedBuffer nextChunk = state.buffers.next(); if (!state.buffers.hasNext()) { logger.trace("Removing stream id {}", streamId); streams.remove(streamId); } return nextChunk; }
NettyStreamManager
为NettyRpcEnv
提供文件流服务。提供对普通文件,jar包和目录的下载和添加缓存的功能。TransportRequestHandler
的StreamRequest
消息的处理依赖于NettyStreamManager
,各个Executor节点就可以使用Driver节点的RpcEnv提供的``NettyStreamManager`,从Driver将Jar包或文件下载到Executor节点上供任务执行。
RpcHandler
处理TransportRequestHandler
中的请求消息。下面主要看其实现类NettyRpcHandler
internalReceive()
: 将ByteBuffer
封装为RequestMessage
类型。由TransportClient
获取远端地址,在构造RequestMessage
时对ByteBuffer
进行了反序列化,若没有发送者的地址,则使用之前TransprtClient
获取到的地址。若有发送者的地址,则在Inbox
中添加RemoteProcessConnected
消息private def internalReceive(client: TransportClient, message: ByteBuffer): RequestMessage = { val addr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress] assert(addr != null) val clientAddr = RpcAddress(addr.getHostString, addr.getPort) val requestMessage = RequestMessage(nettyEnv, client, message) if (requestMessage.senderAddress == null) { // Create a new message with the socket address of the client as the sender. new RequestMessage(clientAddr, requestMessage.receiver, requestMessage.content) } else { // The remote RpcEnv listens to some port, we should also fire a RemoteProcessConnected for // the listening address val remoteEnvAddress = requestMessage.senderAddress if (remoteAddresses.putIfAbsent(clientAddr, remoteEnvAddress) == null) { dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) } requestMessage } } private[netty] object RequestMessage { private def readRpcAddress(in: DataInputStream): RpcAddress = { val hasRpcAddress = in.readBoolean() if (hasRpcAddress) { RpcAddress(in.readUTF(), in.readInt()) } else { null } } def apply(nettyEnv: NettyRpcEnv, client: TransportClient, bytes: ByteBuffer): RequestMessage = { val bis = new ByteBufferInputStream(bytes) val in = new DataInputStream(bis) try { val senderAddress = readRpcAddress(in) val endpointAddress = RpcEndpointAddress(readRpcAddress(in), in.readUTF()) val ref = new NettyRpcEndpointRef(nettyEnv.conf, endpointAddress, nettyEnv) ref.client = client new RequestMessage( senderAddress, ref, // The remaining bytes in `bytes` are the message content. nettyEnv.deserialize(client, bytes)) } finally { in.close() } } } // NettyRpcEnv.deserialize() private[netty] def deserialize[T: ClassTag](client: TransportClient, bytes: ByteBuffer): T = { NettyRpcEnv.currentClient.withValue(client) { deserialize { () => javaSerializerInstance.deserialize[T](bytes) } } }
receive()
: 处理一条TransportClient
发送的 RPC 消息。底层是将消息交由Dispatcher
去处理,将消息放入Inbox
里。// RpcEndpoint.receiveAndReply() override def receive( client: TransportClient, message: ByteBuffer, callback: RpcResponseCallback): Unit = { val messageToDispatch = internalReceive(client, message) dispatcher.postRemoteMessage(messageToDispatch, callback) } // RpcEndpoint.receive() override def receive( client: TransportClient, message: ByteBuffer): Unit = { val messageToDispatch = internalReceive(client, message) dispatcher.postOneWayMessage(messageToDispatch) }
getStreamManager()
: 获取StreamManager
,由上一小节所述可以获取单个块channelActive
:向Inbox
投递RemoteProcessConnected
消 息channelInactive
:向Inbox
投递RemoteProcessDisconnected
消 息exceptionCaught()
: 向Inbox
投递RemoteProcessConnectionError
消 息
REFERENCE
- spark 源码分析
- Spark内核设计的艺术:架构设计与实现
文档信息
- 本文作者:wzx
- 本文链接:https://masterwangzx.com/2020/09/01/RpcStreaming/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)