WZX's blog 海滩上捡贝壳的孩子

Spark源码阅读(二十四):常用RDD计算接口详解

2020-10-14
wzx

解读RDD常用的计算接口

Scala算子

Scala 迭代器对象Iterator中常见函数式编程算子

  • foreach(): 遍历。对迭代器的所有元素应用函数f(),函数f()的返回值将被忽略

    def foreach[U](f: A => U) { while (hasNext) f(next()) }
    
  • map(): 映射。对迭代器的所有元素应用函数f(),函数f()的返回值将组成一个新的迭代器

    def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
      def hasNext = self.hasNext
      def next() = f(self.next())
    }
    
  • flatMap(): 扁平化映射。先map()再扁平化

    • 函数f()将迭代器中每个元素都映射成了一个可迭代的对象
    • 返回的迭代器会按顺序迭代函数f()返回的每个可迭代对象
    def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
      private var cur: Iterator[B] = empty
      private def nextCur() { cur = f(self.next()).toIterator }
      def hasNext: Boolean = {
        // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
        // but slightly shorter bytecode (better JVM inlining!)
        while (!cur.hasNext) {
          if (!self.hasNext) return false
          nextCur()
        }
        true
      }
      def next(): B = (if (hasNext) cur else empty).next()
    }
    
  • filter(): 过滤。过滤出符合一定条件的元素。hasNext()方法会遍历原迭代器中的所有元素直到符合函数p(),由next()返回

    def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] {
      private var hd: A = _
      private var hdDefined: Boolean = false
      
      def hasNext: Boolean = hdDefined || {
        do {
          if (!self.hasNext) return false
          hd = self.next()
        } while (!p(hd))
        hdDefined = true
        true
      }
      
      def next() = if (hasNext) { hdDefined = false; hd } else empty.next()
    }
    
  • zipWithIndex(): 给每个元素加上索引

    def zipWithIndex: Iterator[(A, Int)] = new AbstractIterator[(A, Int)] {
      var idx = 0
      def hasNext = self.hasNext
      def next = {
        val ret = (self.next(), idx)
        idx += 1
        ret
      }
    }
    
  • reduce(), reduceLeft)(), reduceRight(): 聚合。不断迭代并使用函数op()进行聚合,reduce()reduceLeft()都是从左往右聚合,而reduceRight()是从右往左聚合

    def reduce[A1 >: A](op: (A1, A1) => A1): A1 = reduceLeft(op)
      
    def reduceLeft[B >: A](op: (B, A) => B): B = {
      if (isEmpty)
      throw new UnsupportedOperationException("empty.reduceLeft")
      
      var first = true
      var acc: B = 0.asInstanceOf[B]
      
      for (x <- self) {
        if (first) {
          acc = x
          first = false
        }
        else acc = op(acc, x)
      }
      acc
    }
      
    def reduceRight[B >: A](op: (A, B) => B): B = {
      if (isEmpty)
      throw new UnsupportedOperationException("empty.reduceRight")
      
      reversed.reduceLeft[B]((x, y) => op(y, x))
    }
    
  • fold(), foldLeft(), foldRight(): 折叠。与reduce()一致但是多了一个指定的初始值

    def fold[A1 >: A](z: A1)(op: (A1, A1) => A1): A1 = foldLeft(z)(op)
      
    def foldLeft[B](z: B)(op: (B, A) => B): B = {
      var result = z
      this foreach (x => result = op(result, x))
      result
    }
      
    def foldRight[B](z: B)(op: (A, B) => B): B =
      reversed.foldLeft(z)((x, y) => op(y, x))
    

RDD

MapPartitionsRDD

将函数应用于父RDD的每个partition的RDD。实现的RDD.compute()计算函数将传递进来的函数f()应用于父RDD的partition并返回结果

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {
...

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))
    
...
}

PartitionwiseSampledRDD

对其父RDD partition进行抽样的RDD。实现的RDD.compute()计算函数将调用传递进来的抽样器的RandomSampler.sample()对父RDD的partition抽样并返回结果

private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
    prev: RDD[T],
    sampler: RandomSampler[T, U],
    preservesPartitioning: Boolean,
    @transient private val seed: Long = Utils.random.nextLong)
  extends RDD[U](prev) {
...
    
  override def compute(splitIn: Partition, context: TaskContext): Iterator[U] = {
    val split = splitIn.asInstanceOf[PartitionwiseSampledRDDPartition]
    val thisSampler = sampler.clone
    thisSampler.setSeed(split.seed)
    thisSampler.sample(firstParent[T].iterator(split.prev, context))
  }

...
}

// RandomSampler.scala
def sample(items: Iterator[T]): Iterator[U] =
  items.filter(_ => sample > 0).asInstanceOf[Iterator[U]]

CartesianRDD

两个RDD的笛卡尔积。实现的RDD.compute()计算函数将返回两个RDD相同partition内的所有元素的笛卡尔积

private[spark]
class CartesianRDD[T: ClassTag, U: ClassTag](
    sc: SparkContext,
    var rdd1 : RDD[T],
    var rdd2 : RDD[U])
  extends RDD[(T, U)](sc, Nil)
  with Serializable {
...

  override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }

...
}

ShuffledRDD

shuffle过后的RDD。实现的RDD.compute()计算函数调用ShuffleManager.getReader()获取,再通过BlockStoreShuffleReader获取shuffle后的partition,这部分前文已经分析过

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil) {
...
    
  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

...
}

Spark算子

每个算子内都先对传入的闭包进行了清理sc.clean()实际上调用了ClosureCleaner.clean()方法对闭包进行清理,递归闭包内的外部引用,去除为必要的引用,如果遇到不可序列化的引用则抛出异常。

Transform算子

PairRDDFunctions中的方法是针对tuple类型的RDD,通过隐式转换自动调用

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
  new PairRDDFunctions(rdd)
}


  • map(): 映射。构造MapPartitionsRDD,并传入对partition中每个元素都使用cleanF()函数映射的函数

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    }
    
  • flatMap(): 扁平化映射。构造MapPartitionsRDD,并传入对partition中每个元素都使用cleanF()函数扁平化映射的函数

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    }
    
  • filter(): 过滤。构造MapPartitionsRDD,并传入对partition中每个元素都使用cleanF()函数过滤的函数

    def filter(f: T => Boolean): RDD[T] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
    }
    
  • sample(): 抽样。构造MapPartitionsRDD,并传入抽样器

    def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] = {
      require(fraction >= 0,
              s"Fraction must be nonnegative, but got ${fraction}")
      
      withScope {
        require(fraction >= 0.0, "Negative fraction value: " + fraction)
        if (withReplacement) {
          new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
        } else {
          new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
        }
      }
    }
    
  • cartesian(): 返回两个RDD的笛卡尔积。构造CartesianRDD

    def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
      new CartesianRDD(sc, this, other)
    }
    
  • distinct(): 去重。使用了reduceByKey()算子实现了去重

    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
    }
    
  • reduceByKey(): 根据key进行聚合PairRDDFunctions的方法,调用了combineByKeyWithClassTag()方法进行聚合。如果产生shuffle,此算子会在map端进行合并

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
      combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
    }
    
  • groupBy(): 分组。先使用map()算子进行映射,在对映射后的RDD使用groupByKey()算子

    def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
    : RDD[(K, Iterable[T])] = withScope {
      val cleanF = sc.clean(f)
      this.map(t => (cleanF(t), t)).groupByKey(p)
    }
    
  • groupByKey(): 根据分区器对RDD进行重新分区PairRDDFunctions的方法,调用combineByKeyWithClassTag()方法进行聚合。如果产生shuffle,此算子不会在map端进行合并

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
      // groupByKey shouldn't use map side combine because map side combine does not
      // reduce the amount of data shuffled and requires all map side data be inserted
      // into a hash table, leading to more objects in the old gen.
      val createCombiner = (v: V) => CompactBuffer(v)
      val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
      val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
      val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
        createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
      bufs.asInstanceOf[RDD[(K, Iterable[V])]]
    }
    
  • combineByKeyWithClassTag(): 将相同key的元素聚合createCombiner函数用于生成聚合初始值,mergeValue函数用于将value和聚合值进行聚合生成聚合值,mergeCombiners用于两个聚合值的合并

    • 如果分区器与此RDD一致,则直接调用mapPartitions()算子,在此算子内使用聚合器对每个partition进行聚合
    • 如果分区器与此RDD不一致,则构造ShuffledRDD并指定聚合器
    def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
      require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
      if (keyClass.isArray) {
        if (mapSideCombine) {
          throw new SparkException("Cannot use map-side combining with array keys.")
        }
        if (partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
        }
      }
      val aggregator = new Aggregator[K, V, C](
        self.context.clean(createCombiner),
        self.context.clean(mergeValue),
        self.context.clean(mergeCombiners))
      if (self.partitioner == Some(partitioner)) {
        self.mapPartitions(iter => {
          val context = TaskContext.get()
          new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
        }, preservesPartitioning = true)
      } else {
        new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
      }
    }
    

Action算子

ction算子都调用SparkContext.runJob()方法,内部使用了DagScheduler.runJob()去提交job,并指定传入的动作函数

def runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}


  • collect(): 将此RDD中的所有元素收集到driver以一个数组形式返回

    def collect(): Array[T] = withScope {
      val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
      Array.concat(results: _*)
    }
    
  • foreach(): 对RDD中的所有元素应用函数

    def foreach(f: T => Unit): Unit = withScope {
      val cleanF = sc.clean(f)
      sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    
  • reduce(): 对RDD中的所有元素进行聚合reducePartition函数对partition进行聚合,mergeResult函数对各个partition聚合的结果进行聚合

    def reduce(f: (T, T) => T): T = withScope {
      val cleanF = sc.clean(f)
      val reducePartition: Iterator[T] => Option[T] = iter => {
        if (iter.hasNext) {
          Some(iter.reduceLeft(cleanF))
        } else {
          None
        }
      }
      var jobResult: Option[T] = None
      val mergeResult = (index: Int, taskResult: Option[T]) => {
        if (taskResult.isDefined) {
          jobResult = jobResult match {
            case Some(value) => Some(f(value, taskResult.get))
            case None => taskResult
          }
        }
      }
      sc.runJob(this, reducePartition, mergeResult)
      // Get the final result out of our Option, or throw an exception if the RDD was empty
      jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
    }
    

Similar Posts

Comments