解读RDD常用的计算接口
Scala算子
Scala 迭代器对象Iterator
中常见函数式编程算子
foreach()
: 遍历。对迭代器的所有元素应用函数f()
,函数f()
的返回值将被忽略def foreach[U](f: A => U) { while (hasNext) f(next()) }
map()
: 映射。对迭代器的所有元素应用函数f()
,函数f()
的返回值将组成一个新的迭代器def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] { def hasNext = self.hasNext def next() = f(self.next()) }
flatMap()
: 扁平化映射。先map()
再扁平化- 函数
f()
将迭代器中每个元素都映射成了一个可迭代的对象 - 返回的迭代器会按顺序迭代函数
f()
返回的每个可迭代对象
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] { private var cur: Iterator[B] = empty private def nextCur() { cur = f(self.next()).toIterator } def hasNext: Boolean = { // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext } // but slightly shorter bytecode (better JVM inlining!) while (!cur.hasNext) { if (!self.hasNext) return false nextCur() } true } def next(): B = (if (hasNext) cur else empty).next() }
- 函数
filter()
: 过滤。过滤出符合一定条件的元素。hasNext()
方法会遍历原迭代器中的所有元素直到符合函数p()
,由next()
返回def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] { private var hd: A = _ private var hdDefined: Boolean = false def hasNext: Boolean = hdDefined || { do { if (!self.hasNext) return false hd = self.next() } while (!p(hd)) hdDefined = true true } def next() = if (hasNext) { hdDefined = false; hd } else empty.next() }
zipWithIndex()
: 给每个元素加上索引def zipWithIndex: Iterator[(A, Int)] = new AbstractIterator[(A, Int)] { var idx = 0 def hasNext = self.hasNext def next = { val ret = (self.next(), idx) idx += 1 ret } }
reduce()
,reduceLeft)()
,reduceRight()
: 聚合。不断迭代并使用函数op()
进行聚合,reduce()
和reduceLeft()
都是从左往右聚合,而reduceRight()
是从右往左聚合def reduce[A1 >: A](op: (A1, A1) => A1): A1 = reduceLeft(op) def reduceLeft[B >: A](op: (B, A) => B): B = { if (isEmpty) throw new UnsupportedOperationException("empty.reduceLeft") var first = true var acc: B = 0.asInstanceOf[B] for (x <- self) { if (first) { acc = x first = false } else acc = op(acc, x) } acc } def reduceRight[B >: A](op: (A, B) => B): B = { if (isEmpty) throw new UnsupportedOperationException("empty.reduceRight") reversed.reduceLeft[B]((x, y) => op(y, x)) }
fold()
,foldLeft()
,foldRight()
: 折叠。与reduce()
一致但是多了一个指定的初始值def fold[A1 >: A](z: A1)(op: (A1, A1) => A1): A1 = foldLeft(z)(op) def foldLeft[B](z: B)(op: (B, A) => B): B = { var result = z this foreach (x => result = op(result, x)) result } def foldRight[B](z: B)(op: (A, B) => B): B = reversed.foldLeft(z)((x, y) => op(y, x))
RDD
MapPartitionsRDD
将函数应用于父RDD的每个partition的RDD。实现的RDD.compute()
计算函数将传递进来的函数f()
应用于父RDD的partition并返回结果
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev) {
...
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
...
}
PartitionwiseSampledRDD
对其父RDD partition进行抽样的RDD。实现的RDD.compute()
计算函数将调用传递进来的抽样器的RandomSampler.sample()
对父RDD的partition抽样并返回结果
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
prev: RDD[T],
sampler: RandomSampler[T, U],
preservesPartitioning: Boolean,
@transient private val seed: Long = Utils.random.nextLong)
extends RDD[U](prev) {
...
override def compute(splitIn: Partition, context: TaskContext): Iterator[U] = {
val split = splitIn.asInstanceOf[PartitionwiseSampledRDDPartition]
val thisSampler = sampler.clone
thisSampler.setSeed(split.seed)
thisSampler.sample(firstParent[T].iterator(split.prev, context))
}
...
}
// RandomSampler.scala
def sample(items: Iterator[T]): Iterator[U] =
items.filter(_ => sample > 0).asInstanceOf[Iterator[U]]
CartesianRDD
两个RDD的笛卡尔积。实现的RDD.compute()
计算函数将返回两个RDD相同partition内的所有元素的笛卡尔积
private[spark]
class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
extends RDD[(T, U)](sc, Nil)
with Serializable {
...
override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
...
}
ShuffledRDD
shuffle过后的RDD。实现的RDD.compute()
计算函数调用ShuffleManager.getReader()
获取,再通过BlockStoreShuffleReader
获取shuffle后的partition,这部分前文已经分析过
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
...
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
...
}
Spark算子
每个算子内都先对传入的闭包进行了清理,sc.clean()
实际上调用了ClosureCleaner.clean()
方法对闭包进行清理,递归闭包内的外部引用,去除为必要的引用,如果遇到不可序列化的引用则抛出异常。
Transform算子
PairRDDFunctions
中的方法是针对tuple类型的RDD,通过隐式转换自动调用
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
map()
: 映射。构造MapPartitionsRDD
,并传入对partition中每个元素都使用cleanF()
函数映射的函数def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
flatMap()
: 扁平化映射。构造MapPartitionsRDD
,并传入对partition中每个元素都使用cleanF()
函数扁平化映射的函数def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }
filter()
: 过滤。构造MapPartitionsRDD
,并传入对partition中每个元素都使用cleanF()
函数过滤的函数def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, (context, pid, iter) => iter.filter(cleanF), preservesPartitioning = true) }
sample()
: 抽样。构造MapPartitionsRDD
,并传入抽样器def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] = { require(fraction >= 0, s"Fraction must be nonnegative, but got ${fraction}") withScope { require(fraction >= 0.0, "Negative fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) } else { new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) } } }
cartesian()
: 返回两个RDD的笛卡尔积。构造CartesianRDD
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { new CartesianRDD(sc, this, other) }
distinct()
: 去重。使用了reduceByKey()
算子实现了去重def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) }
reduceByKey()
: 根据key进行聚合。PairRDDFunctions
的方法,调用了combineByKeyWithClassTag()
方法进行聚合。如果产生shuffle,此算子会在map端进行合并def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
groupBy()
: 分组。先使用map()
算子进行映射,在对映射后的RDD使用groupByKey()
算子def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) : RDD[(K, Iterable[T])] = withScope { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) }
groupByKey()
: 根据分区器对RDD进行重新分区。PairRDDFunctions
的方法,调用combineByKeyWithClassTag()
方法进行聚合。如果产生shuffle,此算子不会在map端进行合并def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
combineByKeyWithClassTag()
: 将相同key的元素聚合。createCombiner
函数用于生成聚合初始值,mergeValue
函数用于将value和聚合值进行聚合生成聚合值,mergeCombiners
用于两个聚合值的合并- 如果分区器与此RDD一致,则直接调用
mapPartitions()
算子,在此算子内使用聚合器对每个partition进行聚合 - 如果分区器与此RDD不一致,则构造
ShuffledRDD
并指定聚合器
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("HashPartitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
- 如果分区器与此RDD一致,则直接调用
Action算子
ction算子都调用SparkContext.runJob()
方法,内部使用了DagScheduler.runJob()
去提交job,并指定传入的动作函数
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
collect()
: 将此RDD中的所有元素收集到driver以一个数组形式返回。def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
foreach()
: 对RDD中的所有元素应用函数def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
reduce()
: 对RDD中的所有元素进行聚合。reducePartition
函数对partition进行聚合,mergeResult
函数对各个partition聚合的结果进行聚合def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult.isDefined) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) }
文档信息
- 本文作者:wzx
- 本文链接:https://masterwangzx.com/2020/10/14/spark-api/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)