Scala 迭代器对象Iterator中常见函数式编程算子

  • foreach(): 遍历。对迭代器的所有元素应用函数f(),函数f()的返回值将被忽略

    def foreach[U](f: A => U) { while (hasNext) f(next()) }
  • map(): 映射。对迭代器的所有元素应用函数f(),函数f()的返回值将组成一个新的迭代器

    def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
      def hasNext = self.hasNext
      def next() = f(
  • flatMap(): 扁平化映射。先map()再扁平化

    • 函数f()将迭代器中每个元素都映射成了一个可迭代的对象
    • 返回的迭代器会按顺序迭代函数f()返回的每个可迭代对象
    def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
      private var cur: Iterator[B] = empty
      private def nextCur() { cur = f( }
      def hasNext: Boolean = {
        // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
        // but slightly shorter bytecode (better JVM inlining!)
        while (!cur.hasNext) {
          if (!self.hasNext) return false
      def next(): B = (if (hasNext) cur else empty).next()
  • filter(): 过滤。过滤出符合一定条件的元素。hasNext()方法会遍历原迭代器中的所有元素直到符合函数p(),由next()返回

    def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] {
      private var hd: A = _
      private var hdDefined: Boolean = false
      def hasNext: Boolean = hdDefined || {
        do {
          if (!self.hasNext) return false
          hd =
        } while (!p(hd))
        hdDefined = true
      def next() = if (hasNext) { hdDefined = false; hd } else
  • zipWithIndex(): 给每个元素加上索引

    def zipWithIndex: Iterator[(A, Int)] = new AbstractIterator[(A, Int)] {
      var idx = 0
      def hasNext = self.hasNext
      def next = {
        val ret = (, idx)
        idx += 1
  • reduce(), reduceLeft)(), reduceRight(): 聚合。不断迭代并使用函数op()进行聚合,reduce()reduceLeft()都是从左往右聚合,而reduceRight()是从右往左聚合

    def reduce[A1 >: A](op: (A1, A1) => A1): A1 = reduceLeft(op)
    def reduceLeft[B >: A](op: (B, A) => B): B = {
      if (isEmpty)
      throw new UnsupportedOperationException("empty.reduceLeft")
      var first = true
      var acc: B = 0.asInstanceOf[B]
      for (x <- self) {
        if (first) {
          acc = x
          first = false
        else acc = op(acc, x)
    def reduceRight[B >: A](op: (A, B) => B): B = {
      if (isEmpty)
      throw new UnsupportedOperationException("empty.reduceRight")
      reversed.reduceLeft[B]((x, y) => op(y, x))
  • fold(), foldLeft(), foldRight(): 折叠。与reduce()一致但是多了一个指定的初始值

    def fold[A1 >: A](z: A1)(op: (A1, A1) => A1): A1 = foldLeft(z)(op)
    def foldLeft[B](z: B)(op: (B, A) => B): B = {
      var result = z
      this foreach (x => result = op(result, x))
    def foldRight[B](z: B)(op: (A, B) => B): B =
      reversed.foldLeft(z)((x, y) => op(y, x))




private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev) {

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))



对其父RDD partition进行抽样的RDD。实现的RDD.compute()计算函数将调用传递进来的抽样器的RandomSampler.sample()对父RDD的partition抽样并返回结果

private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
    prev: RDD[T],
    sampler: RandomSampler[T, U],
    preservesPartitioning: Boolean,
    @transient private val seed: Long = Utils.random.nextLong)
  extends RDD[U](prev) {

  override def compute(splitIn: Partition, context: TaskContext): Iterator[U] = {
    val split = splitIn.asInstanceOf[PartitionwiseSampledRDDPartition]
    val thisSampler = sampler.clone
    thisSampler.sample(firstParent[T].iterator(split.prev, context))


// RandomSampler.scala
def sample(items: Iterator[T]): Iterator[U] =
  items.filter(_ => sample > 0).asInstanceOf[Iterator[U]]



class CartesianRDD[T: ClassTag, U: ClassTag](
    sc: SparkContext,
    var rdd1 : RDD[T],
    var rdd2 : RDD[U])
  extends RDD[(T, U)](sc, Nil)
  with Serializable {

  override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)




class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil) {

  override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .asInstanceOf[Iterator[(K, C)]]






implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
  new PairRDDFunctions(rdd)

  • map(): 映射。构造MapPartitionsRDD,并传入对partition中每个元素都使用cleanF()函数映射的函数

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) =>
  • flatMap(): 扁平化映射。构造MapPartitionsRDD,并传入对partition中每个元素都使用cleanF()函数扁平化映射的函数

    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  • filter(): 过滤。构造MapPartitionsRDD,并传入对partition中每个元素都使用cleanF()函数过滤的函数

    def filter(f: T => Boolean): RDD[T] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[T, T](
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
  • sample(): 抽样。构造MapPartitionsRDD,并传入抽样器

    def sample(
      withReplacement: Boolean,
      fraction: Double,
      seed: Long = Utils.random.nextLong): RDD[T] = {
      require(fraction >= 0,
              s"Fraction must be nonnegative, but got ${fraction}")
      withScope {
        require(fraction >= 0.0, "Negative fraction value: " + fraction)
        if (withReplacement) {
          new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
        } else {
          new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
  • cartesian(): 返回两个RDD的笛卡尔积。构造CartesianRDD

    def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
      new CartesianRDD(sc, this, other)
  • distinct(): 去重。使用了reduceByKey()算子实现了去重

    def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
      map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
  • reduceByKey(): 根据key进行聚合PairRDDFunctions的方法,调用了combineByKeyWithClassTag()方法进行聚合。如果产生shuffle,此算子会在map端进行合并

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
      combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  • groupBy(): 分组。先使用map()算子进行映射,在对映射后的RDD使用groupByKey()算子

    def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
    : RDD[(K, Iterable[T])] = withScope {
      val cleanF = sc.clean(f) => (cleanF(t), t)).groupByKey(p)
  • groupByKey(): 根据分区器对RDD进行重新分区PairRDDFunctions的方法,调用combineByKeyWithClassTag()方法进行聚合。如果产生shuffle,此算子不会在map端进行合并

    def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
      // groupByKey shouldn't use map side combine because map side combine does not
      // reduce the amount of data shuffled and requires all map side data be inserted
      // into a hash table, leading to more objects in the old gen.
      val createCombiner = (v: V) => CompactBuffer(v)
      val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
      val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
      val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
        createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
      bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  • combineByKeyWithClassTag(): 将相同key的元素聚合createCombiner函数用于生成聚合初始值,mergeValue函数用于将value和聚合值进行聚合生成聚合值,mergeCombiners用于两个聚合值的合并

    • 如果分区器与此RDD一致,则直接调用mapPartitions()算子,在此算子内使用聚合器对每个partition进行聚合
    • 如果分区器与此RDD不一致,则构造ShuffledRDD并指定聚合器
    def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
      require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
      if (keyClass.isArray) {
        if (mapSideCombine) {
          throw new SparkException("Cannot use map-side combining with array keys.")
        if (partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
      val aggregator = new Aggregator[K, V, C](
      if (self.partitioner == Some(partitioner)) {
        self.mapPartitions(iter => {
          val context = TaskContext.get()
          new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
        }, preservesPartitioning = true)
      } else {
        new ShuffledRDD[K, V, C](self, partitioner)



def runJob[T, U: ClassTag](
  rdd: RDD[T],
  func: (TaskContext, Iterator[T]) => U,
  partitions: Seq[Int],
  resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

  • collect(): 将此RDD中的所有元素收集到driver以一个数组形式返回

    def collect(): Array[T] = withScope {
      val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
      Array.concat(results: _*)
  • foreach(): 对RDD中的所有元素应用函数

    def foreach(f: T => Unit): Unit = withScope {
      val cleanF = sc.clean(f)
      sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  • reduce(): 对RDD中的所有元素进行聚合reducePartition函数对partition进行聚合,mergeResult函数对各个partition聚合的结果进行聚合

    def reduce(f: (T, T) => T): T = withScope {
      val cleanF = sc.clean(f)
      val reducePartition: Iterator[T] => Option[T] = iter => {
        if (iter.hasNext) {
        } else {
      var jobResult: Option[T] = None
      val mergeResult = (index: Int, taskResult: Option[T]) => {
        if (taskResult.isDefined) {
          jobResult = jobResult match {
            case Some(value) => Some(f(value, taskResult.get))
            case None => taskResult
      sc.runJob(this, reducePartition, mergeResult)
      // Get the final result out of our Option, or throw an exception if the RDD was empty
      jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))



