Sorting in Spark is a multiphase process which requires shuffling:
- input RDD is sampled and this sample is used to compute boundaries for each output partition (
sample
followed by collect
)
- input RDD is partitioned using
rangePartitioner
with boundaries computed in the first step (partitionBy
)
- each partition from the second step is sorted locally (
mapPartitions
)
When the data is collected, all that is left is to follow the order defined by the partitioner.
Above steps are clearly reflected in a debug string:
scala> val rdd = sc.parallelize(Seq(4, 2, 5, 3, 1))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at ...
scala> rdd.sortBy(identity).toDebugString
res1: String =
(6) MapPartitionsRDD[10] at sortBy at <console>:24 [] // Sort partitions
| ShuffledRDD[9] at sortBy at <console>:24 [] // Shuffle
+-(8) MapPartitionsRDD[6] at sortBy at <console>:24 [] // Pre-shuffle steps
| ParallelCollectionRDD[0] at parallelize at <console>:21 [] // Parallelize
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…