WZX's blog 海滩上捡贝壳的孩子

Spark源码阅读(十一):存储体系之block传输

2020-09-11
wzx

介绍Spark中的block传输

DiskBlockObjectWriter

用于将JVM中的对象直接写入磁盘文件中,并且允许追加数据至现有block中。有以下重要成员属性

  • bufferSize: 缓冲大小
  • syncWrites: 是否同步写
  • blockId: BlockId
  • channel: FileChannel
  • mcs: ManualCloseOutputStream
  • bs: OutputStream
  • fos: FileOutputStream
  • objOut: SerializationStream
  • ts: TimeTrackingOutputStream
  • initialized: 是否已经初始化
  • streamOpen: 是否已经打开流
  • hasBeenClosed: 是否已经关闭
  • committedPosition: 提交的文件位置
  • reportedPosition: 报告给度量系统的文件位置
  • numRecordsWritten: 已写的记录数

下面是重要的方法

  • open(): 打开要写入文件的各种输出流及管道

    def open(): DiskBlockObjectWriter = {
      if (hasBeenClosed) {
        throw new IllegalStateException("Writer already closed. Cannot be reopened.")
      }
      if (!initialized) {
        initialize()
        initialized = true
      }
      
      bs = serializerManager.wrapStream(blockId, mcs)
      objOut = serializerInstance.serializeStream(bs)
      streamOpen = true
      this
    }
    
  • recordWritten(): 用于对写入的记录数进行统计

    private def updateBytesWritten() {
      val pos = channel.position()
      writeMetrics.incBytesWritten(pos - reportedPosition)
      reportedPosition = pos
    }
      
    def recordWritten(): Unit = {
      numRecordsWritten += 1
      writeMetrics.incRecordsWritten(1)
      
      // 2^14
      if (numRecordsWritten % 16384 == 0) {
        updateBytesWritten()
      }
    }
    
  • write(): 向输出流中写入数据

    override def write(kvBytes: Array[Byte], offs: Int, len: Int): Unit = {
      if (!streamOpen) {
        open()
      }
      
      bs.write(kvBytes, offs, len)
    }
    
  • commitAndGet(): 把输出流中的数据写入文件并返回FileSegmentsyncWrites表示强制等待当前流写完

    def commitAndGet(): FileSegment = {
      if (streamOpen) {
        // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
        //       serializer stream and the lower level stream.
        objOut.flush()
        bs.flush()
        objOut.close()
        streamOpen = false
      
        if (syncWrites) {
          // Force outstanding writes to disk and track how long it takes
          val start = System.nanoTime()
          fos.getFD.sync()
          writeMetrics.incWriteTime(System.nanoTime() - start)
        }
      
        val pos = channel.position()
        val fileSegment = new FileSegment(file, committedPosition, pos - committedPosition)
        committedPosition = pos
        // In certain compression codecs, more bytes are written after streams are closed
        writeMetrics.incBytesWritten(committedPosition - reportedPosition)
        reportedPosition = committedPosition
        numRecordsWritten = 0
        fileSegment
      } else {
        new FileSegment(file, committedPosition, 0)
      }
    }
    

BlockManagerMessages

BlockManagerMaster负责发送和接收的消息

  • RemoveExecutor: 移除Executor
  • RegisterBlockManager: 注册BlockManager
  • UpdateBlockInfo: 更新block信息
  • GetLocations: 获取block的位置
  • GetLocationsMultipleBlockIds: 获取多个block的位置
  • GetPeers: 获取其他BlockManagerBlockManagerId
  • GetExecutorEndpointRef: 获取Executor的EndpointRef引用
  • RemoveBlock: 移除block
  • RemoveRdd: 移除RDD block
  • RemoveShuffle: 移除Shuffle block
  • RemoveBroadcast: 移除Broadcast block
  • GetMemoryStatus: 获取指定的BlockManager的内存状态
  • GetStorageStatus: 获取存储状态
  • GetBlockStatus: 获取block的状态
  • GetMatchingBlockIds: 获取匹配过滤条件的block
  • HasCachedBlocks: 指定的Executor上是否有缓存的block
  • StopBlockManagerMaster: 停止BlockManagerMaster

BlockManagerMasterEndpoint

继承了前文所述的RPC组件中的ThreadSafeRpcEndpoint。有以下重要的成员属性

  • blockManagerInfo: BlockManagerIdBlockManagerInfo之间映射关系
  • blockManagerIdByExecutor: Executor ID和BlockManagerInfo之间映射关系
  • blockLocations: BlockIdBlockManagerIdset的映射关系,记录了block的存储位置
  • topologyMapper: 集群所有结点的拓扑结构映射

重写了特质RpcEndpointreceiveAndReply()方法,使用模式匹配将BlockManagerMessages中的消息交由对应的方法去处理。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
  context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))

  case _updateBlockInfo @
  UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
  context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
  listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))

  case GetLocations(blockId) =>
  context.reply(getLocations(blockId))

  ......

}

BlockManagerSlaveEndpoint

BlockManagerMasterEndpoint的继承关系相同,同样重写了特质RpcEndpointreceiveAndReply()方法,使用模式匹配将BlockManagerMessages中的消息交由对应的方法去处理。

OneForOneBlockFetcher

每个chunk代表一个block,用于BlockTransferService调用以完成block的下载。下面是重要的成员变量

  • client: TransportClient
  • openMessage: OpenBlocks。保存远端节点的appId、execId(Executor标识)和blockIds
  • blockIds: 字符串数组。与openMessage.blockIds一致
  • listenerBlockFetchingListener,将在获取Block成功或失败时被回调
  • chunkCallback:获取block成功或失败时回调,配合BlockFetchingListener使用

下面是主要的start()方法,用于开启获取流程

  • client向server端发送openMessage消息,回调方法放入缓存client端的缓存中,具体实现前文已经提过
  • 在回调方法中,如果回调onSuccess()则调用client.fetchChunk()获取所有block
public void start() {
  if (blockIds.length == 0) {
    throw new IllegalArgumentException("Zero-sized blockIds array");
  }

  client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
      try {
        streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
        logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);

        // Immediately request all chunks -- we expect that the total size of the request is
        // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
        for (int i = 0; i < streamHandle.numChunks; i++) {
          if (downloadFileManager != null) {
            client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
                          new DownloadCallback(i));
          } else {
            client.fetchChunk(streamHandle.streamId, i, chunkCallback);
          }
        }
      } catch (Exception e) {
        logger.error("Failed while starting block fetches after success", e);
        failRemainingBlocks(blockIds, e);
      }
    }

    @Override
    public void onFailure(Throwable e) {
      logger.error("Failed while starting block fetches", e);
      failRemainingBlocks(blockIds, e);
    }
  });
}

private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
  for (String blockId : failedBlockIds) {
    try {
      listener.onBlockFetchFailure(blockId, e);
    } catch (Exception e2) {
      logger.error("Error in block fetch failure callback", e2);
    }
  }
}

NettyBlockRpcServer

继承了RpcHandler,用于处理 NettyBlockTransferService中的 TransportRequestHandler中的请求消息,作为shuffle的server端

重写了receive()putBlockDataAsStream()

receive(): 主要接收并处理OpenBlocksUploadBlock消息

  • 当接收到OpenBlocks消息时,获取到请求的BlockId,由BlockManager.getBlockData获取封装为ManagedBuffer的block数据(序列化)序列
  • 调用OneForOneStreamManager.registerStream()将block数据序列和app id 和channel一起注册到streams中,streamId自增生成
  • 创建StreamHandle消息(包含streamIdManagedBuffer序列的大小),并通过响应上下文回复客户端
  • 当接收到UploadBlock消息时,获取到请求的元数据解析得到存储等级,类型信息和BlockId
  • 调用BlockManager.putBlockData()写入本地存储体系中
  • 通过响应上下文回复客户端
override def receive(
  client: TransportClient,
  rpcMessage: ByteBuffer,
  responseContext: RpcResponseCallback): Unit = {
  val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
  logTrace(s"Received request: $message")

  message match {
    case openBlocks: OpenBlocks =>
    val blocksNum = openBlocks.blockIds.length
    val blocks = for (i <- (0 until blocksNum).view)
    yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
    val streamId = streamManager.registerStream(appId, blocks.iterator.asJava,
                                                client.getChannel)
    logTrace(s"Registered streamId $streamId with $blocksNum buffers")
    responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)

    case uploadBlock: UploadBlock =>
    // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
    val (level: StorageLevel, classTag: ClassTag[_]) = {
      serializer
      .newInstance()
      .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
      .asInstanceOf[(StorageLevel, ClassTag[_])]
    }
    val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
    val blockId = BlockId(uploadBlock.blockId)
    logDebug(s"Receiving replicated block $blockId with level ${level} " +
             s"from ${client.getSocketAddress}")
    blockManager.putBlockData(blockId, data, level, classTag)
    responseContext.onSuccess(ByteBuffer.allocate(0))
  }
}

BlockTransferService

用于发起block的上传和下载请求,作为shuffle的client端。下面是具体的两个方法

  • fetchBlockSync(): 同步下载block,实质就是调用未实现的fetchBlocks(),并且等待BlockFetchingListener处理获取后的block数据,这里要等待两个流程(请求block,拉取block数据,具体看总结部分)

    def fetchBlockSync(
      host: String,
      port: Int,
      execId: String,
      blockId: String,
      tempFileManager: DownloadFileManager): ManagedBuffer = {
      // A monitor for the thread to wait on.
      val result = Promise[ManagedBuffer]()
      fetchBlocks(host, port, execId, Array(blockId),
                  new BlockFetchingListener {
                    override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
                      result.failure(exception)
                    }
                    override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
                      data match {
                        case f: FileSegmentManagedBuffer =>
                        result.success(f)
                        case e: EncryptedManagedBuffer =>
                        result.success(e)
                        case _ =>
                        try {
                          val ret = ByteBuffer.allocate(data.size.toInt)
                          ret.put(data.nioByteBuffer())
                          ret.flip()
                          result.success(new NioManagedBuffer(ret))
                        } catch {
                          case e: Throwable => result.failure(e)
                        }
                      }
                    }
                  }, tempFileManager)
      ThreadUtils.awaitResult(result.future, Duration.Inf)
    }
    
  • uploadBlockSync(): 同步上传block,内部调用了未实现的uploadBlock()方法,同步的实现方式与fetchBlockSync()相同,只不过Promise对象在uploadBlock()方法内部创建了

    def uploadBlockSync(
      hostname: String,
      port: Int,
      execId: String,
      blockId: BlockId,
      blockData: ManagedBuffer,
      level: StorageLevel,
      classTag: ClassTag[_]): Unit = {
      val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
      ThreadUtils.awaitResult(future, Duration.Inf)
    }
    

BlockTransferService有两个实现类:用于测试的MockBlockTransferServiceNettyBlockTransferService

下面主要介绍NettyBlockTransferService。有以下重要成员对象

  • transportContext: TransportContext
  • server: TransportServer

下面是重要的成员方法

  • init(): NettyBlockTransferService只有在其init()方法被调用,即被初始化后才提供服务

    private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = {
      def startService(port: Int): (TransportServer, Int) = {
        val server = transportContext.createServer(bindAddress, port, bootstraps.asJava)
        (server, server.getPort)
      }
      
      Utils.startServiceOnPort(_port, startService, conf, getClass.getName)._1
    }
      
    override def init(blockDataManager: BlockDataManager): Unit = {
      val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
      var serverBootstrap: Option[TransportServerBootstrap] = None
      var clientBootstrap: Option[TransportClientBootstrap] = None
      if (authEnabled) {
        serverBootstrap = Some(new AuthServerBootstrap(transportConf, securityManager))
        clientBootstrap = Some(new AuthClientBootstrap(transportConf, conf.getAppId, securityManager))
      }
      transportContext = new TransportContext(transportConf, rpcHandler)
      clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava)
      server = createServer(serverBootstrap.toList)
      appId = conf.getAppId
      logInfo(s"Server created on ${hostName}:${server.getPort}")
    }
    
  • fetchBlocks(): 发送下载远端block请求

    • 创建RetryingBlockFetcher.BlockFetchStarter,如果最大重试次数大于0则创建RetryingBlockFetcher用于发送请求,否则直接调用blockFetchStartercreateAndStart()方法,创建TransportClient再创建OneForOneBlockFetcher并调用start()

      override def fetchBlocks(
        host: String,
        port: Int,
        execId: String,
        blockIds: Array[String],
        listener: BlockFetchingListener,
        tempFileManager: DownloadFileManager): Unit = {
        logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
        try {
          val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
            override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
              val client = clientFactory.createClient(host, port)
              new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
                                        transportConf, tempFileManager).start()
            }
          }
      
          val maxRetries = transportConf.maxIORetries()
          if (maxRetries > 0) {
            // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
            // a bug in this code. We should remove the if statement once we're sure of the stability.
            new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
          } else {
            blockFetchStarter.createAndStart(blockIds, listener)
          }
        } catch {
          case e: Exception =>
          logError("Exception while beginning fetchBlocks", e)
          blockIds.foreach(listener.onBlockFetchFailure(_, e))
        }
      }
      
    • RetryingBlockFetcher中实际上调用了fetchAllOutstanding()方法。内部发送获取block请求部分其实是调用的fetchBlocks()中传入的blockFetchStartercreateAndStart()方法与非重试的fetcher请求代码是一样的

      // RetryingBlockFetcher
      public void start() {
        fetchAllOutstanding();
      }
      
      private void fetchAllOutstanding() {
        // Start by retrieving our shared state within a synchronized block.
        String[] blockIdsToFetch;
        int numRetries;
        RetryingBlockFetchListener myListener;
        synchronized (this) {
          blockIdsToFetch = outstandingBlocksIds.toArray(new String[outstandingBlocksIds.size()]);
          numRetries = retryCount;
          myListener = currentListener;
        }
      
        // Now initiate the fetch on all outstanding blocks, possibly initiating a retry if that fails.
        try {
          fetchStarter.createAndStart(blockIdsToFetch, myListener);
        } catch (Exception e) {
          logger.error(String.format("Exception while beginning fetch of %s outstanding blocks %s",
                                     blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
      
          if (shouldRetry(e)) {
            initiateRetry();
          } else {
            for (String bid : blockIdsToFetch) {
              listener.onBlockFetchFailure(bid, e);
            }
          }
        }
      }
      
    • 如果出现异常会调用shouldRetry()判断是否需要重试,异常是IOException并且当前的重试次数retryCount小于最大重试次数maxRetries。之后调用initiateRetry()方法进行重试,重试次数+1,新建RetryingBlockFetchListener后,使用线程池(Executors.newCachedThreadPool)提交重试任务(等待一段时间后,再次调用fetchAllOutstanding()),重试机制是异步的

      // RetryingBlockFetcher
      private synchronized boolean shouldRetry(Throwable e) {
        boolean isIOException = e instanceof IOException
        || (e.getCause() != null && e.getCause() instanceof IOException);
        boolean hasRemainingRetries = retryCount < maxRetries;
        return isIOException && hasRemainingRetries;
      }
      
      private synchronized void initiateRetry() {
        retryCount += 1;
        currentListener = new RetryingBlockFetchListener();
      
        logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
                    retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
      
        executorService.submit(() -> {
          Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
          fetchAllOutstanding();
        });
      }
      
  • uploadBlock(): 发送向远端上传block的请求

    • 创建TransportClient
    • 序列化存储等级和类型
    • 如果大于spark.maxRemoteBlockSizeFetchToMem则调动TransportClient.uploadStream()作为流将block上传
    • 否则调用TransportClient.sendRpc()直接将block数据作为字节数组上传
    override def uploadBlock(
      hostname: String,
      port: Int,
      execId: String,
      blockId: BlockId,
      blockData: ManagedBuffer,
      level: StorageLevel,
      classTag: ClassTag[_]): Future[Unit] = {
      val result = Promise[Unit]()
      val client = clientFactory.createClient(hostname, port)
    
      // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
      // Everything else is encoded using our binary protocol.
      val metadata = JavaUtils.bufferToArray(serializer.newInstance().serialize((level, classTag)))
    
      val asStream = blockData.size() > conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
      val callback = new RpcResponseCallback {
        override def onSuccess(response: ByteBuffer): Unit = {
          logTrace(s"Successfully uploaded block $blockId${if (asStream) " as stream" else ""}")
          result.success((): Unit)
        }
    
        override def onFailure(e: Throwable): Unit = {
          logError(s"Error while uploading $blockId${if (asStream) " as stream" else ""}", e)
          result.failure(e)
        }
      }
      if (asStream) {
        val streamHeader = new UploadBlockStream(blockId.name, metadata).toByteBuffer
        client.uploadStream(new NioManagedBuffer(streamHeader), blockData, callback)
      } else {
        // Convert or copy nio buffer into array in order to serialize it.
        val array = JavaUtils.bufferToArray(blockData.nioByteBuffer())
    
        client.sendRpc(new UploadBlock(appId, execId, blockId.name, metadata, array).toByteBuffer,
                       callback)
      }
    
      result.future
    }
    

总结

异步获取远端block的流程如下

  1. client端调用NettyBlockTransferService.fetchBlocks()方法获取block,如果指定重试次数,则调用RetryingBlockFetcher.start(),内部调用BlockFetchStarter.createAndStart()并且失败异步重试,同时传入BlockFetchingListener用于获取block到本地后的成功或者失败回调函数(观察者模式)
  2. 如果未指定重试次数,则是直接调用BlockFetchStarter.createAndStart(),失败不重试
  3. BlockFetchStarter.createAndStart()将调用OneForOneBlockFetcher.start()
  4. OneForOneBlockFetcher.start()将调用TransportClient.sendRpc()方法发送OpenBlocks消息。接着用于处理server端响应的回调函数在TransportResponseHandler中注册
  5. server端的TransportRequestHandler接收到了client端发送的消息,然后调用 NettyBlockRpcServer处理OpenBlocks消息
  6. NettyBlockRpcServer.receive()中循环调用BlockManager.getBlockData()在本地存储系统中获取block数据序列
  7. 接着调用OneForOneStreamManager.registerStream(),将block数据序列注册为一个流
  8. 创建StreamHandle包含流Id和block数据序列大小通过成功回调函数响应给client端
  9. client端的TransportResponseHandler接收到server端的响应消息,然后调用之前注册的回调函数,迭代请求返回的流id中的所有chunk id(chunk与block意义相同),调用TransportClient.fetchChunk()
  10. 这个方法调用后与1~9的流程类似,先想server端请求,server端再将block数据返回具体如下。从远端节点的缓存streams中找到与streamId对应的StreamState,并根据索引返回StreamStateManagedBuffer序列中的某一个ManagedBuffer(block数据)。最后调用步骤1和2中注册的观察者BlockFetchingListener处理获取到block数据

异步上传block至远端的流程如下

  1. client端要通过BlockManager获取block数据,转化为ManagedBuffer类型
  2. 调用NettyBlockTransferService.uploadBlock()方法上传block到远端节点
  3. 内部调用了TransportClient.sendRpc()方法发送UploadBlock消息,接着用于处理server端响应的回调函数在TransportResponseHandler中注册
  4. server端的TransportRequestHandler接收到了client端发送的消息,然后调用 NettyBlockRpcServer处理UploadBlock消息。NettyBlockRpcServer.receive()中循环调用BlockManager.putBlockData()将block数据写入本地存储系统中
  5. NettyBlockRpcServer将处理成功的消息返回给客户端,client端的TransportResponseHandler接收到server端的响应消息,然后调用之前注册的回调函数

REFERENCE

  1. Spark内核设计的艺术:架构设计与实现

Similar Posts

Comments