Spark源码和调优简介 Spark Core(下)

来源:腾讯技术工程微信号
作者:calvinrzluo丨腾讯 IEG 后台开发工程师

本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正。为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容。

Shuffle

Shuffle 机制是 Spark Core 的核心内容。在 Stage 和 Stage 之间,Spark 需要 Shuffle 数据。这个流程包含上一个 Stage 上的 Shuffle Write,中间的数据传输,以及下一个 Stage 的 Shuffle Read。如下图所示:

Shuffle 类操作常常发生在宽依赖的 RDD 之间,这类算子需要将多个节点上的数据拉取到同一节点上进行计算,其中存在大量磁盘 IO、序列化和网络传输开销,它们可以分为以下几点来讨论。

当 Spark 中的某个节点故障之后,常常需要重算 RDD 中的某几个分区。对于窄依赖而言,父 RDD 的一个分区只对应一个子 RDD 分区,因此丢失子 RDD 的分区,重算整个父 RDD 分区是必要的。而对于宽依赖而言,父 RDD 会被多个子 RDD 使用,而可能当前丢失的子 RDD 只使用了父 RDD 中的某几个分区的数据,而我们仍然要重新计算整个父 RDD,这造成了计算资源的浪费。

当使用 Aggregate 类(如groupByKey)或者 Join 类这种 Shuffle 算子时,如果选择的key上的数据是倾斜(skew)的,会导致部分节点上的负载增大。对于这种情况除了可以增加 Executor 的内存,还可以重新选择分区函数(例如在之前的 key 上加盐)来平衡分区。

Shuffle Read 操作容易产生 OOM,其原因是尽管在BlockStoreShuffleReader中会产生外部排序的resultIter,但在这之前,ExternalAppendOnlyMap先要从 BlockManager 拉取数据(k, v)到自己的currentMap中,如果这里的v很大,那么就会导致 Executor 的 OOM 问题。可以从PairRDDFunctions的文档中佐证这一点。在Dataset中并没有reduceByKey,原因可能与 Catalyst Optimizer 的优化有关,但考虑到groupByKey还是比较坑的,感觉这个举措并不明智。

Shuffle 考古

在 Spark0.8 版本前,Spark 只有 Hash Based Shuffle 的机制。在这种方式下,假定 Shuffle Write 阶段(有的也叫 Map 阶段)有W个 Task,在 Shuffle Read 阶段(有的也叫 Reduce 阶段)有R个 Task,那么就会产生W*R个文件。这样的坏处是对文件系统产生很大压力,并且 IO 也差(随机读写)。由于这些文件是先全量在内存里面构造,再 dump 到磁盘上,所以 Shuffle 在 Write 阶段就很可能 OOM。

为了解决这个问题,在 Spark 0.8.1 版本加入了 File Consolidation,以求将W个 Task 的输出尽可能合并。现在,Executor 上的每一个执行单位都生成自己独一份的文件。假定所有的 Executor 总共有C个核心,每个 Task 占用T个核心,那么总共有C/T个执行单位。考虑极端情况,如果C==T,那么任务实际上是串行的,所以写一个文件就行了。因此,最终会生成C/T*R个文件。

但这个版本仍然没有解决 OOM 的问题。虽然对于 reduce 这类操作,比如count,因为是来一个 combine 一个,所以只要你的 V 不是数组,也不想强行把结果 concat 成一个数组,一般都没有较大的内存问题。但是考虑如果我们执行groupByKey这样的操作,在 Read 阶段每个 Task 需要得到得到自己负责的 key 对应的所有 value,而我们现在每个 Task 得到的是若干很大的文件,这个文件里面的 key 是杂乱无章的。如果我们需要得到一个 key 对应的所有 value,那么我们就需要遍历这个文件,将 key 和对应的 value 全部存放在一个结构比如 HashMap 中,并进行合并。因此,我们必须保证这个 HashMap 足够大。既然如此,我们很容易想到一个基于外部排序的方案,我们为什么不能对 key 进行外排呢?确实在 Hadoop MapReduce 中会做归并排序,因此 Reducer 侧的数据按照 key 组织好的了。但 Spark 在这个版本没有这么做,并且 Spark 在下一个版本就这么做了。

在 Spark 0.9 版本之后,引入了ExternalAppendOnlyMap,通过这个结构,SparkShuffle 在 combine 的时候如果内存不够,就能 Spill 到磁盘,并在 Spill 的时候进行排序。当然,内存还是要能承载一个 KV 的,我们将在稍后的源码分析中深入研究这个问题。

终于在 Spark1.1 版本之后引入了 Sorted Based Shuffle。此时,Shuffle Write 阶段会按照 Partition ID 以及 key 对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,Shuffle Read 的 Task 可以通过该索引文件获取相关的数据。

在 Spark 1.5,Tungsten内存管理机制成为了 Spark 的默认选项。如果关闭spark.sql.tungsten.enabled,Spark 将采用基于 Kryo 序列化的列式存储格式。

Shuffle Read 端源码分析

Shuffle Read 一般位于一个 Stage 的开始,这时候上一个 Stage 会给我们留下一个 ShuffledRDD。在它的compute方法中会首先取出shuffleManager: ShuffleManager

ShuffleManager是一个 Trait,它的两个实现就是org.apache.spark.shuffle.hash.HashShuffleManager
org.apache.spark.shuffle.sort.SortShuffleManager

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]  val metrics = context.taskMetrics().createTempShuffleReadMetrics()  SparkEnv.get.shuffleManager // 由SparkEnv维护的ShuffleManager...

接着,我们调用shuffleManager.getReader方法返回一个BlockStoreShuffleReader,它用来读取[split.index, split.index + 1)这个区间内的 Shuffle 数据。接着,它会调用SparkEnv.get.mapOutputTrackergetMapSizesByExecutorId方法。

getMapSizesByExecutorId返回一个迭代器Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],表示对于某个BlockManagerId,它所存储的 Shuffle Write 中间结果,包括BlockId、大小和 index。
具体实现上,这个方法首先从传入的dep.shuffleHandle中获得当前 Shuffle 过程的唯一标识shuffleId,然后它会从自己维护的shuffleStatuses中找到shuffleId对应的MapStatus,它应该有endPartition-startPartition这么多个。接着,对这些MapStatus,调用convertMapStatuses获得迭代器。在compute中,实际上就只取当前split这一个 Partition 的 Shuffle 元数据。

...    .getReader(dep.shuffleHandle, split.index, split.index + 1, context, metrics) // 返回一个BlockStoreShuffleReader    .read().asInstanceOf[Iterator[(K, C)]]}

ShuffleManager通过调用BlockStoreShuffleReader.read返回一个迭代器Iterator[(K, C)]。在BlockStoreShuffleReader.read方法中,首先得到一个ShuffleBlockFetcherIterator

// BlockStoreShuffleReader.scalaoverride def read(): Iterator[Product2[K, C]] = {  val wrappedStreams = new ShuffleBlockFetcherIterator(    ...    ) // 返回一个ShuffleBlockFetcherIterator    .toCompletionIterator // 返回一个Iterator[(BlockId, InputStream)]

ShuffleBlockFetcherIteratorfetchUpToMaxBytes()fetchLocalBlocks()分别读取 remote 和 local 的 Block。在拉取远程数据的时候,会统计bytesInFlightreqsInFlight等信息,并使用maxBytesInFlightmaxReqsInFlight节制。同时,为了允许 5 个并发同时拉取数据,还会设置targetRemoteRequestSize = math.max(maxBytesInFlight / 5, 1L)去请求每次拉取数据的最大大小。通过ShuffleBlockFetcherIterator.splitLocalRemoteBytes,现在改名叫partitionBlocksByFetchMode函数,可以将 blocks 分为 Local 和 Remote 的。关于两个函数的具体实现,将单独讨论。

  val serializerInstance = dep.serializer.newInstance()  // Create a key/value iterator for each stream  val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>    serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator  }  // Update the context task metrics for each record read.  // CompletionIterator相比普通的Iterator的区别就是在结束之后会调用一个completion函数  // CompletionIterator通过它伴生对象的apply方法创建,传入第二个参数即completionFunction  val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](    recordIter.map { record =>      readMetrics.incRecordsRead(1)      record    },    context.taskMetrics().mergeShuffleReadMetrics())  // An interruptible iterator must be used here in order to support task cancellation  val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter "(Any, Any)")  ...

经过一系列转换,我们得到一个interruptibleIter。接下来,根据是否有 mapSideCombine 对它进行聚合。这里的dep来自于BaseShuffleHandle对象,它是一个ShuffleDependency。在前面 Spark 的任务调度中已经提到,ShuffleDependency就是宽依赖。

// BlockStoreShuffleReader.scala  ...  val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {    if (dep.mapSideCombine) {      // We are reading values that are already combined      val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]      dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)    } else {      // We don't know the value type, but also don't care -- the dependency *should*      // have made sure its compatible w/ this aggregator, which will convert the value      // type to the combined type C      val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]      dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)    }  } else {    interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]  }

这里的aggregatorAggregator[K, V, C],这里的KVC与熟悉combineByKey的是一样的。需要注意的是,在 combine 的过程中借助了ExternalAppendOnlyMap,这是之前提到的在 Spark 0.9 中引入的重要特性。通过调用insertAll方法能够将interruptibleIter内部的数据添加到ExternalAppendOnlyMap中,并在之后更新 MemoryBytesSpilled、DiskBytesSpilled、PeakExecutionMemory 三个统计维度,这也是我们在 Event Log 中所看到的统计维度。

// Aggregator.scalacase class Aggregator[K, V, C] (    createCombiner: V => C,    mergeValue: (C, V) => C,    mergeCombiners: (C, C) => C) {  def combineValuesByKey(      iter: Iterator[_ <: Product2[K, V]],      context: TaskContext): Iterator[(K, C)] = {    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners "K, V, C")    combiners.insertAll(iter)    updateMetrics(context, combiners)    combiners.iterator  }  def combineCombinersByKey(      iter: Iterator[_ <: Product2[K, C]],      context: TaskContext): Iterator[(K, C)] = {    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners "K, C, C")    // 同上  }  /** Update task metrics after populating the external map. */  private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {    Option(context).foreach { c =>      c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)      c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)      c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)    }  }}

在获得 Aggregate 迭代器之后,最后一步,我们要进行排序,这时候就需要用到ExternalSorter这个对象。

// BlockStoreShuffleReader.scala...  val resultIter = dep.keyOrdering match {    case Some(keyOrd: Ordering[K]) =>      val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd "K, C, C"), serializer = dep.serializer)      sorter.insertAll(aggregatedIter)      context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)      context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)      context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)      // Use completion callback to stop sorter if task was finished/cancelled.      context.addTaskCompletionListener[Unit](_ => {        sorter.stop()      })      CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop( "Product2[K, C], Iterator[Product2[K, C]]"))    case None =>      aggregatedIter  }

ExternalAppendOnlyMap 和 AppendOnlyMap

我们查看ExternalAppendOnlyMap的实现。ExternalAppendOnlyMap拥有一个currentMap管理在内存中存储的键值对们。和一个DiskMapIterator的数组spilledMaps,表示 Spill 到磁盘上的键值对们。

@volatile private[collection] var currentMap = new SizeTrackingAppendOnlyMap[K, C]private val spilledMaps = new ArrayBuffer[DiskMapIterator]

先来看currentMap,它是一个SizeTrackingAppendOnlyMap。这个东西实际上就是一个AppendOnlyMap,不过给它加上了统计数据大小的功能,主要是借助于SizeTrackerafterUpdateresetSamples两个方法。我们知道非序列化对象在内存存储上是不连续的,我们需要通过遍历迭代器才能知道对象的具体大小,而这个开销是比较大的。因此,通过SizeTracker我们可以得到一个内存空间占用的估计值,从来用来判定是否需要 Spill。
下面,我们来看currentMap.insertAll这个方法

// AppendOnlyMap.scaladef insertAll(entries: Iterator[Product2[K, V]]): Unit = {  if (currentMap == null) {    throw new IllegalStateException(      "Cannot insert new elements into a map after calling iterator")  }  // 我们复用update函数,从而避免每一次都创建一个新的闭包(编程环境这么恶劣的么。。。)  var curEntry: Product2[K, V] = null  val update: (Boolean, C) => C = (hadVal, oldVal) => {    if (hadVal)      // 如果不是第一个V,就merge      // mergeValue: (C, V) => C,      mergeValue(oldVal, curEntry._2)    else      // 如果是第一个V,就新建一个C      // createCombiner: V => C,      createCombiner(curEntry._2)  }  while (entries.hasNext) {    curEntry = entries.next()    val estimatedSize = currentMap.estimateSize()    if (estimatedSize > _peakMemoryUsedBytes) {      _peakMemoryUsedBytes = estimatedSize    }    if (maybeSpill(currentMap, estimatedSize)) {      // 如果发生了Spill,就重新创建一个currentMap      currentMap = new SizeTrackingAppendOnlyMap[K, C]    }    // key: K, updateFunc: (Boolean, C) => C    currentMap.changeValue(curEntry._1, update)    addElementsRead()  }}

可以看出,在insertAll中主要做了两件事情:

  1. 遍历curEntry <- entries,并通过传入的update函数进行 Combine 在内部存储上,AppendOnlyMap,包括后面将看到的一些其他 KV 容器,都倾向于将(K, V)对放到哈希表的相邻两个位置,这样的好处应该是避免访问时再进行一次跳转。

    下面的代码是AppendOnlyMap.changeValue的实现,它接受一个updateFunc用来更新一个指定K的值。updateFunc接受第一个布尔值,用来表示是不是首次出现这个 key。我们需要注意,AppendOnlyMap里面 null 是一个合法的键,但同时null又作为它里面的哈希表的默认填充,因此它里面有个对null特殊处理的过程。

    // AppendOnlyMap.scala// 这里的nullValue和haveNullValue是用来单独处理k为null的情况的,下面会详细说明private var haveNullValue = false// 有关null.asInstanceOf[V]的花里胡哨的语法,详见 https://stackoverflow.com/questions/10749010/if-an-int-cant-be-null-what-does-null-asinstanceofint-meanprivate var nullValue: V = null.asInstanceOf[V]def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { // updateFunc就是从insertAll传入的update assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] if (k.eq(null)) {   if (!haveNullValue) {     // 如果这时候还没有null的这个key,就新创建一个     incrementSize()   }   nullValue = updateFunc(haveNullValue, nullValue)   haveNullValue = true   return nullValue } var pos = rehash(k.hashCode) & mask var i = 1 while (true) {   // 乘以2的原因是他按照K1 V1 K2 V2这样放的   val curKey = data(2 * pos)   if (curKey.eq(null)) {     // 如果对应的key不存在,就新创建一个     // 这也是为什么前面要单独处理null的原因,这里的null被用来做placeholder了     // 可以看到,第一个参数传的false,第二个是花里胡哨的null     val newValue = updateFunc(false, null.asInstanceOf[V])     data(2 * pos) = k     data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]     incrementSize()     return newValue   } else if (k.eq(curKey) || k.equals(curKey)) { // 又是从Java继承下来的花里胡哨的特性     val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])     data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]     return newValue   } else {     // 再散列     val delta = i     pos = (pos + delta) & mask     i += 1   } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy}
  2. 估计currentMap的当前大小,并调用currentMap.maybeSpill向磁盘 Spill。我们将在单独的章节论述SizeTracker如何估计集合大小,先看具体的 Spill 过程,可以梳理出shouldSpill==true的情况
    1、 elementsRead % 32 == 0
    2、 currentMemory >= myMemoryThreshold
    3、 通过acquireMemory请求的内存不足以扩展到2 * currentMemory的大小,关于这一步骤已经在内存管理部分详细说明了,在这就不详细说了

    // Spillable.scalaprotected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {   val amountToRequest = 2 * currentMemory - myMemoryThreshold   // 调用对应MemoryConsumer的acquireMemory方法   val granted = acquireMemory(amountToRequest)   myMemoryThreshold += granted   shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold ...// MemoryConsumer.scalapublic long acquireMemory(long size) { long granted = taskMemoryManager.acquireExecutionMemory(size, this); used += granted; return granted;}

    下面就是真正 Spill 的过程了,其实就是调用 spill 函数。注意_memoryBytesSpilled就是我们在 Event Log 里面看到的 Memory Spill 的统计量,他表示在 Spill 之后我们能够释放多少内存:

    // Spillable.scala ... // Actually spill if (shouldSpill) {   _spillCount += 1 // 统计Spill的次数   logSpillage(currentMemory)   spill(collection)   _elementsRead = 0 // 重置强制Spill计数器_elementsRead   _memoryBytesSpilled += currentMemory   releaseMemory() } shouldSpill}

insertAll之后,会返回一个迭代器,我们查看相关方法。可以发现如果spilledMaps都是空的,也就是没有 Spill 的话,就返回内存里面currentMapiterator,否则就返回一个ExternalIterator

对于第一种情况,会用SpillableIterator包裹一下。这个类在很多地方有定义,包括ExternalAppendOnlyMap.scala,ExternalSorter.scala里面。在当前使用的实现中,它实际上就是封装了一下Iterator,使得能够 spill,转换成CompletionIterator等。

对于第二种情况,ExternalIterator比较有趣,将在稍后进行讨论。

// ExternalAppendOnlyMap.scalaoverride def iterator: Iterator[(K, C)] = {  ...  if (spilledMaps.isEmpty) {    // 如果没有发生Spill    destructiveIterator(currentMap.iterator)  } else {    // 如果发生了Spill    new ExternalIterator()  }}def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {  readingIterator = new SpillableIterator(inMemoryIterator)  readingIterator.toCompletionIterator}

currentMap.iterator实际上就是一个朴素无华的迭代器的实现。

// AppendOnlyMap.scaladef nextValue(): (K, V) = {  if (pos == -1) {    // Treat position -1 as looking at the null value    if (haveNullValue) {      return (null.asInstanceOf[K], nullValue)    }    pos += 1  }  while (pos < capacity) {    if (!data(2 * pos).eq(null)) {      return (data(2 * pos).asInstanceOf[K], data(2 * pos + 1).asInstanceOf[V])    }    pos += 1  }  null}

ExternalSorter

ExternalSorter的作用是对输入的(K, V)进行排序,以产生新的(K, C)对,排序过程中可选择进行 combine,否则输出的C == V。需要注意的是ExternalSorter不仅被用在 Shuffle Read 端,也被用在了 Shuffle Write 端,所以在后面会提到 Map-side combine 的概念。ExternalSorter具有如下的参数,在给定ordering之后,ExternalSorter就会按照它来排序。在 Spark 源码中建议如果希望进行 Map-side combining 的话,就指定ordering,否则就可以设置orderingnull

private[spark] class ExternalSorter[K, V, C](    context: TaskContext,    aggregator: Option[Aggregator[K, V, C]] = None,    partitioner: Option[Partitioner] = None,    ordering: Option[Ordering[K]] = None,    serializer: Serializer = SparkEnv.get.serializer)  extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager( "WritablePartitionedPairCollection[K, C]"))

由于ExternalSorter支持有 combine 和没有 combine 的两种模式,因此对应设置了两个对象。map = new PartitionedAppendOnlyMap[K, C],以及buffer = new PartitionedPairBuffer[K, C]。其中,PartitionedAppendOnlyMap就是一个SizeTrackingAppendOnlyMapPartitionedPairBuffer则继承了WritablePartitionedPairCollection,由于不需要按照 key 进行 combine,所以它的实现接近于一个 Array。

ExternalSorter.insertAll方法和之前看到的ExternalAppendOnlyMap方法是大差不差的,他也会对可以聚合的特征进行聚合,并且 TODO 上还说如果聚合之后的 reduction factor 不够明显,就停止聚合。

相比之前的 aggregator,ExternalSorter不仅能 aggregate,还能 sort。ExternalSorter在 Shuffle Read 和 Write 都有使用,而ExternalAppendOnlyMap只有在 Shuffle Read 中使用。所以为啥不直接搞一个ExternalSorter而是还要在前面垫一个ExternalAppendOnlyMap呢?为此,我们总结比较一下这两者:

首先,在insertAll时,ExternalAppendOnlyMap是一定要做 combine 的,而ExternalSorter可以选择是否做 combine,为此还有PartitionedAppendOnlyMapPartitionedPairBuffer两种数据结构。

其次,在做排序时,ExternalAppendOnlyMap默认对内存中的对象不进行排序,只有当要 Spill 的时候才会返回AppendOnlyMap.destructiveSortedIterator的方式将内存里面的东西有序写入磁盘。在返回迭代器时,如果没有发生 Spill,那么ExternalAppendOnlyMap返回没有经过排序的currentMap,否则才通过ExternalIterator进行排序。而对ExternalSorter而言排序与否在于有没有指定ordering。如果进行排序的话,那么它会首先考虑 Partition,再考虑 Key。

ExternalIterator

下面我们来看ExternalAppendOnlyMapExternalIterator的实现。它是一个典型的外部排序的实现,有一个 PQ 用来 merge。不过这次的迭代器换成了destructiveSortedIterator,也就是我们都是排序的了。这个道理也是显而易见的,不 sort 一下,我们怎么和硬盘上的数据做聚合呢?

// ExternalAppendOnlyMap.scalaval mergeHeap = new mutable.PriorityQueue[StreamBuffer]val sortedMap = destructiveIterator(currentMap.destructiveSortedIterator(keyComparator))// 我们得到一个Array的迭代器val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)inputStreams.foreach { it =>  val kcPairs = new ArrayBuffer[(K, C)]  // 读完所有具有所有相同hash(key)的序列,并创建一个StreamBuffer  // 需要注意的是,由于哈希碰撞的原因,里面可能有多个key  readNextHashCode(it, kcPairs)  if (kcPairs.length > 0) {    mergeHeap.enqueue(new StreamBuffer(it, kcPairs))  }}

我们先来看看destructiveSortedIterator的实现

// AppendOnlyMap.scaladef destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {  destroyed = true  var keyIndex, newIndex = 0  // 下面这个循环将哈希表里面散乱的KV对压缩到最前面  while (keyIndex < capacity) {    if (data(2 * keyIndex) != null) {      data(2 * newIndex) = data(2 * keyIndex)      data(2 * newIndex + 1) = data(2 * keyIndex + 1)      newIndex += 1    }    keyIndex += 1  }  assert(curSize == newIndex + (if (haveNullValue) 1 else 0))  new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)  // 这下面和前面实现大差不差,就省略了  new Iterator[(K, V)] {    ...  }}

下面我们来看看实现的next()接口函数,它是外部排序中的一个典型的归并过程。我们需要注意的是minBuffer是一个StreamBuffer,维护一个hash(K), VArrayBuffer,类似H1 V1 H1 V2 H2 V3这样的序列,而不是我们想的(K, V)流。因此其中是可能有哈希碰撞的。我们从mergeHeapdequeue出来的StreamBuffer是当前hash(K)最小的所有K的集合。

override def next(): (K, C) = {  if (mergeHeap.isEmpty) {    // 如果堆是空的,就再见了    throw new NoSuchElementException  }  // Select a key from the StreamBuffer that holds the lowest key hash  // mergeHeap选择所有StreamBuffer中最小hash的,作为minBuffer  val minBuffer = mergeHeap.dequeue()  // minPairs是一个ArrayBuffer[T],表示这个StreamBuffer维护的所有KV对  val minPairs = minBuffer.pairs  val minHash = minBuffer.minKeyHash  // 从一个ArrayBuffer[T]中移出Index为0的项目  val minPair = removeFromBuffer(minPairs, 0)  // 得到非哈希的 (minKey, minCombiner)  val minKey = minPair._1  var minCombiner = minPair._2  assert(hashKey(minPair) == minHash)  // For all other streams that may have this key (i.e. have the same minimum key hash),  // merge in the corresponding value (if any) from that stream  val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer "StreamBuffer")  while (mergeHeap.nonEmpty && mergeHeap.head.minKeyHash == minHash) {    val newBuffer = mergeHeap.dequeue()    // 如果newBuffer的key和minKey相等的话(考虑哈希碰撞),就合并    minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)    mergedBuffers += newBuffer  }  // Repopulate each visited stream buffer and add it back to the queue if it is non-empty  mergedBuffers.foreach { buffer =>    if (buffer.isEmpty) {      readNextHashCode(buffer.iterator, buffer.pairs)    }    if (!buffer.isEmpty) {      mergeHeap.enqueue(buffer)    }  }  (minKey, minCombiner)}

SizeTracker

首先在每次集合更新之后,会调用afterUpdate,当到达采样的 interval 之后,会takeSample

// SizeTracker.scalaprotected def afterUpdate(): Unit = {  numUpdates += 1  if (nextSampleNum == numUpdates) {    takeSample()  }}

takeSample函数中第一句话就涉及多个对象,一个一个来看。

// SizeTracker.scalaprivate def takeSample(): Unit = {  samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))  ...

SizeEstimator.estimate的实现类似去做一个 state 队列上的 BFS。

private def estimate(obj: AnyRef, visited: IdentityHashMap[AnyRef, AnyRef]): Long = {  val state = new SearchState(visited)  state.enqueue(obj)  while (!state.isFinished) {    visitSingleObject(state.dequeue(), state)  }  state.size}

visitSingleObject来具体做这个 BFS,会特殊处理 Array 类型。我们不处理反射,因为反射包里面会引用到很多全局反射对象,这个对象又会应用到很多全局的大对象。同理,我们不处理 ClassLoader,因为它里面会应用到整个 REPL。反正 ClassLoaders 和 Classes 是所有对象共享的。

private def visitSingleObject(obj: AnyRef, state: SearchState): Unit = {  val cls = obj.getClass  if (cls.isArray) {    visitArray(obj, cls, state)  } else if (cls.getName.startsWith("scala.reflect")) {  } else if (obj.isInstanceOf[ClassLoader] || obj.isInstanceOf[Class[_]]) {    // Hadoop JobConfs created in the interpreter have a ClassLoader.  } else {    obj match {      case s: KnownSizeEstimation =>        state.size += s.estimatedSize      case _ =>        val classInfo = getClassInfo(cls)        state.size += alignSize(classInfo.shellSize)        for (field <- classInfo.pointerFields) {          state.enqueue(field.get(obj))        }    }  }}

然后我们创建一个Sample,并且放到队列samples

private object SizeTracker {  case class Sample(size: Long, numUpdates: Long)}

下面的主要工作就是计算一个bytesPerUpdate

  ...  // Only use the last two samples to extrapolate  // 如果sample太多了,就删除掉一些  if (samples.size > 2) {    samples.dequeue()  }  val bytesDelta = samples.toList.reverse match {    case latest :: previous :: tail =>      (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)    // If fewer than 2 samples, assume no change    case _ => 0  }  bytesPerUpdate = math.max(0, bytesDelta)  nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong}

我们统计到上次估算之后经历的 update 数量,并乘以bytesPerUpdate,即可得到总大小

// SizeTracker.scaladef estimateSize(): Long = {  assert(samples.nonEmpty)  val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)  (samples.last.size + extrapolatedDelta).toLong}

Shuffle Write 端源码分析

Shuffle Write 端的实现主要依赖ShuffleManager中的ShuffleWriter对象,目前使用的ShuffleManagerSortShuffleManager,因此只讨论它。它是一个抽象类,主要有SortShuffleWriterUnsafeShuffleWriterBypassMergeSortShuffleWriter等实现。

SortShuffleWriter

private[spark] abstract class ShuffleWriter[K, V] {  /** Write a sequence of records to this task's output */  @throws[IOException]  def write(records: Iterator[Product2[K, V]]): Unit  /** Close this writer, passing along whether the map completed */  def stop(success: Boolean): Option[MapStatus]}

SortShuffleWriter的实现可以说很简单了,就是将records放到一个ExternalSorter里面,然后创建一个ShuffleMapOutputWritershuffleExecutorComponents实际上是一个LocalDiskShuffleExecutorComponentsShuffleMapOutputWriter是一个 Java 接口,实际上被创建的是LocalDiskShuffleMapOutputWriter

// SortShuffleWriteroverride def write(records: Iterator[Product2[K, V]]): Unit = {  sorter = if (dep.mapSideCombine) {    new ExternalSorter[K, V, C](      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)  } else {    // 如果不需要进行mapSideCombine,那么我们传入空的aggregator和ordering,    // 我们在map端不负责对key进行排序,统统留给reduce端吧    new ExternalSorter[K, V, V](      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)  }  sorter.insertAll(records)  // Don't bother including the time to open the merged output file in the shuffle write time,  // because it just opens a single file, so is typically too fast to measure accurately  // (see SPARK-3570).  val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(    dep.shuffleId, mapId, dep.partitioner.numPartitions)  ...

紧接着,调用ExternalSorter.writePartitionedMapOutput将自己维护的map或者buffer(根据是否有 Map Side Aggregation)写到mapOutputWriter提供的partitionWriter里面。其过程用到了一个叫destructiveSortedWritablePartitionedIterator的迭代器,相比destructiveSortedIterator,它是多了 Writable 和 Partitioned 两个词。前者的意思是我可以写到文件,后者的意思是我先按照 partitionId 排序,然后在按照给定的 Comparator 排序。

接着就是commitAllPartitions,这个函数调用writeIndexFileAndCommit

//  ...  sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)  val partitionLengths = mapOutputWriter.commitAllPartitions()

MapStatus被用来保存 Shuffle Write 操作的 metadata。

...  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)}// LocalDiskShuffleMapOutputWriter.java@Overridepublic long[] commitAllPartitions() throws IOException {  ...  cleanUp();  File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;  blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);  return partitionLengths;}

writeIndexFileAndCommit负责为传入的文件dataTmp创建一个索引文件,并原子地提交。注意到,到当前版本,每一个执行单元只会生成一份数据文件和一份索引。

// IndexShuffleBlockResolver.javadef writeIndexFileAndCommit(shuffleId: Int, mapId: Long, lengths: Array[Long], dataTmp: File): Unit

根据writeIndexFileAndCommit的注释,getBlockData会来读它写的块,这个getBlockData同样位于我们先前介绍过的IndexShuffleBlockResolver类中。

Reference

  • https://docs.scala-lang.org/z…
  • https://zhuanlan.zhihu.com/p/…
  • https://fangjian0423.github.i…
  • https://www.cnblogs.com/xia52…
  • https://spark.apache.org/docs…
  • https://blog.csdn.net/bluishg…
  • https://stackoverflow.com/que…
  • https://intellipaat.com/blog/…
  • https://indatalabs.com/blog/c…
  • https://tech.meituan.com/20+/…
  • https://endymecy.gitbooks.io/…
  • https://forums.databricks.com…
  • https://stackoverflow.com/que…
  • https://litaotao.github.io/bo…
  • https://www.iteblog.com/archi…
  • https://vimsky.com/article/27…
  • https://scastie.scala-lang.org/
  • https://www.jianshu.com/p/5c2…
  • https://www.cnblogs.com/nowgo…
  • https://stackoverflow.com/que…
  • https://stackoverflow.com/que…
  • https://stackoverflow.com/que…
  • https://stackoverflow.com/que…
  • https://blog.csdn.net/dabokel…
  • https://blog.csdn.net/zrc+902…
  • https://stackoverflow.com/que…
  • https://twitter.github.io/scala\_school/zh\_cn/advanced-types.html
  • https://colobu.com/20+/05/+/V…
  • https://www.cnblogs.com/fillP…
  • https://issues.scala-lang.org…
  • https://stackoverflow.com/que…
  • http://www.calvinneo.com/2019…
  • http://www.calvinneo.com/2019…
  • 深入理解 SPARK:核心思想与源码分析
  • http://www.calvinneo.com/2019…
  • https://zhuanlan.zhihu.com/p/…
  • http://www.jasongj.com/spark/…
  • https://www.kancloud.cn/kancl…
  • https://www.jianshu.com/p/4c5…
  • http://jerryshao.me/20+/0+04/…
  • https://github.com/hustnn/Tun…
  • https://jaceklaskowski.gitboo…
  • https://blog.k2datascience.co…
  • https://stackoverflow.com/que…
  • https://0x0fff.com/spark-arch…
  • https://0x0fff.com/spark-memo…
  • https://www.slideshare.net/da…
  • https://www.linuxprobe.com/wp…

推荐阅读:

  • Spark源码和调优简介 Spark Core(上)
    • [Spark源码和调优简介 Spark Core(中)]()
  • 十种图像模糊算法的总结与实现

更多腾讯AI相关技术干货,请关注专栏腾讯技术工程

发表评论

邮箱地址不会被公开。 必填项已用*标注