Timsort是spark中用作外部排序的机制。一个典型的应用是在spark sql中用来做Order操作的实现。Order时候将行记录插入到ExternalSorter中,ExternalSorter用timsort排序数组,返回排序后的Iterator。
spark sql的物理计划中,排序Sort属于agg相关的聚合操作。相关的类有:SortAggregateExec、SortBasedAggregationIterator、SortExec等。
-
- SortAggregateExec
排序后数据的聚合操作。构造方法和入参如下:
case class SortAggregateExec(
requiredChildDistributionExpressions: Option[Seq[Expression]],
groupingExpressions: Seq[NamedExpression],
aggregateExpressions: Seq[AggregateExpression],
aggregateAttributes: Seq[Attribute],
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
child: SparkPlan)
extends UnaryExecNode
物理执行通过doExecute(): RDD[InternalRow]方法。主要代码:
val outputIter = new SortBasedAggregationIterator(
groupingExpressions,
child.output,
iter,
aggregateExpressions,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
(expressions, inputSchema) =>
newMutableProjection(expressions, inputSchema, subexpressionEliminationEnabled),
numOutputRows)
if (!hasInput && groupingExpressions.isEmpty) {
// There is no input and there is no grouping expressions.
// We need to output a single row as the output.
numOutputRows += 1
Iterator[UnsafeRow](outputIter.outputForEmptyGroupingKeyWithoutInput())
} else {
outputIter
}
通过构造SortBasedAggregationIterator迭代器来生成聚合后的数据迭代。将聚合前的数据迭代器作为入参传入SortBasedAggregationIterator中。
-
- SortExec
真正执行外部排序的类。 定义:
case class SortExec(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan,
testSpillFrequency: Int = 0)
extends UnaryExecNode with CodegenSupport
child不用说自然是子执行计划。
testSpillFrequency表示是否阶段性的spill数据到磁盘,Int型表示每隔多少条数据就spill到磁盘。一般在测试环境下使用。
sortOrder是排序的字段属性。
global表示是否全局排序,如果全局排序的话一般需要先将各分区的数据打散shuffle,然后再执行排序。
-
-
- 关键方法createSorter
-
def createSorter(): UnsafeExternalRowSorter
生成外部排序类,然后对原始数据的每行数据,插入到外部排序类,最后外部排序类返回排序后的迭代器Iterator。
protected override def doExecute(): RDD[InternalRow] = {
val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
val sortTime = longMetric("sortTime")
child.execute().mapPartitionsInternal { iter =>
val sorter = createSorter()
val metrics = TaskContext.get().taskMetrics()
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
sortTime += sorter.getSortTimeNanos / 1000000
peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
sortedIterator
}
}
利用UnsafeExternalRowSorter生成排序后的Iterator。
-
-
- UnsafeExternalRowSorter
-
UnsafeExternalRowSorter在Spark-catalyst包里。路径sql/execution/ UnsafeExternalRowSorter。
它又使用UnsafeExternalSorter作为内部排序迭代器。UnsafeExternalRowSorter本身的逻辑不复杂,主要是封装了UnsafeExternalSorter来排序。它将原始数据插入到UnsafeExternalSorter中,最后获取UnsafeExternalSorter的排序迭代器。
-
-
- UnsafeExternalSorter
-
UnsafeExternalSorter在spark-core中。
路径:org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter。它是Java类。
最终它是通过Timsort来对内存数据排序的。
-
-
- Timsort
-
包名:org.apache.spark.util.collection;
MIN_MERGE:最小merge长度,如果待排序数组长度小于该值则直接用二分差值法排序,否则引用merge过程。
private final SortDataFormat<K, Buffer> s;
SortDataFormat定义了待排序数据的格式。
binarySort方法
少于32个元素的时候用binarySort排序。是简单实现。
先找出一段已经排序好的数组,lo~hi,然后从hi+1处开始知道数组的最后循环迭代。每次迭代排序lo~hi+index这么长的数组,这个数组有个特点就是前面的数据已经排序好只有最后一个元素没有排序。每次迭代过程大致如下:
找到比最后一个原色大的数组的位置start,然后复制start~hi+index到start+1~hi+index+1,然后将hi+index+1(也就是最后一个原色)复制到start位置处,完成整段数组的排序。
如果多于32个元素,则要复杂一点了。
SortState
利用SortState来做多于32个元素的排序。
/**
* March over the array once, left to right, finding natural runs,
* extending short natural runs to minRun elements, and merging runs
* to maintain stack invariant.
*/
SortState sortState = new SortState(a, c, hi - lo);
int minRun = minRunLength(nRemaining);
do {
// Identify next run
int runLen = countRunAndMakeAscending(a, lo, hi, c);
// If run is short, extend to min(minRun, nRemaining)
if (runLen < minRun) {
int force = nRemaining <= minRun ? nRemaining : minRun;
binarySort(a, lo, lo + force, lo + runLen, c);
runLen = force;
}
// Push run onto pending-run stack, and maybe merge
sortState.pushRun(lo, runLen);
sortState.mergeCollapse();
// Advance to find next run
lo += runLen;
nRemaining -= runLen;
} while (nRemaining != 0);
// Merge all remaining runs to complete sort
assert lo == hi;
sortState.mergeForceCollapse();
assert sortState.stackSize == 1;
大致过程如下:
对2段局部排序的数组,找出插入点,然后执行Range复制插入过程,一次将多个区间数据移动,这样对于2段局部排序好的数组,最多执行2-3次批量移动复制过程就可以完成整体排序。
对于内存不够放的局部排序数据,保存到多个磁盘文件,每个磁盘文件都是一个排序好的文件,这里叫UnsafeSorterSpillReader。
用UnsafeSorterSpillMerger做多个磁盘文件的排序类。每个磁盘文件作为一个文件句柄插入到PriorityQueue排序队列中,将每次取数据时从这多个排序队列中取出最小的元素,实现排序。