scala> val rdd = sc.parallelize(0 to 9)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.getNumPartitions
res0: Int = 8
// ShuffledRDD and coalesce Example
scala> rdd.coalesce(numPartitions = 4, shuffle = true).toDebugString
res1: String =
(4) MapPartitionsRDD[4] at coalesce at <console>:27 []
| CoalescedRDD[3] at coalesce at <console>:27 []
| ShuffledRDD[2] at coalesce at <console>:27 []
+-(8) MapPartitionsRDD[1] at coalesce at <console>:27 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
// ShuffledRDD and sortByKey Example
scala> val grouped = rdd.groupBy(_ % 2)
grouped: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupBy at <console>:26
scala> grouped.sortByKey(numPartitions = 2).toDebugString
res2: String =
(2) ShuffledRDD[9] at sortByKey at <console>:29 []
+-(8) ShuffledRDD[6] at groupBy at <console>:26 []
+-(8) MapPartitionsRDD[5] at groupBy at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
ShuffledRDD
ShuffledRDD is an RDD of key-value pairs that represents the shuffle step in a RDD lineage. It uses custom ShuffledRDDPartition partitions.
A ShuffledRDD is created for RDD transformations that trigger a data shuffling:
-
coalescetransformation (withshuffleflag enabled). -
PairRDDFunctions's combineByKeyWithClassTag and partitionBy (when the parent RDD’s and specified Partitioners are different). -
OrderedRDDFunctions's sortByKey and repartitionAndSortWithinPartitions ordered operators.
ShuffledRDD takes a parent RDD and a Partitioner when created.
getDependencies returns a single-element collection of RDD dependencies with a ShuffleDependency (with the Serializer according to map-side combine internal flag).
Map-Side Combine mapSideCombine Internal Flag
mapSideCombine: Boolean
mapSideCombine internal flag is used to select the Serializer (for shuffling) when ShuffleDependency is created (which is the one and only Dependency of a ShuffledRDD).
|
Note
|
mapSideCombine is only used when userSpecifiedSerializer optional Serializer is not specified explicitly (which is the default).
|
|
Note
|
mapSideCombine uses SparkEnv to access the current SerializerManager.
|
If enabled (i.e. true), mapSideCombine directs to find the Serializer for the types K and C. Otherwise, getDependencies finds the Serializer for the types K and V.
|
Note
|
The types K, C and V are specified when ShuffledRDD is created.
|
|
Note
|
|
Computing Partition (in TaskContext) — compute Method
compute(split: Partition, context: TaskContext): Iterator[(K, C)]
|
Note
|
compute is a part of RDD contract to compute a given partition in a TaskContext.
|
Internally, compute makes sure that the input split is a ShuffleDependency. It then requests ShuffleManager for a ShuffleReader to read key-value pairs (as Iterator[(K, C)]) for the split.
|
Note
|
compute uses SparkEnv to access ShuffleManager.
|
|
Note
|
A Partition has the index property to specify startPartition and endPartition partition offsets.
|
Finding Preferred Locations for Partition — getPreferredLocations Method
getPreferredLocations(partition: Partition): Seq[String]
|
Note
|
getPreferredLocations is a part of RDD contract to specify placement preferences (aka preferred task locations), i.e. where tasks should be executed to be as close to the data as possible.
|
Internally, getPreferredLocations requests MapOutputTrackerMaster for the preferred locations, i.e. BlockManagers with the most map outputs, for the input partition (of the one and only ShuffleDependency).
|
Note
|
getPreferredLocations uses SparkEnv to access MapOutputTrackerMaster (which runs on the driver).
|
ShuffledRDDPartition
ShuffledRDDPartition gets an index when it is created (that in turn is the index of partitions as calculated by the Partitioner of a ShuffledRDD).