log4j.logger.org.apache.spark.shuffle.sort.SortShuffleWriter=ERROR
SortShuffleWriter — Fallback ShuffleWriter
SortShuffleWriter is a ShuffleWriter that is used when SortShuffleManager returns a ShuffleWriter for ShuffleHandle (and the more specialized BypassMergeSortShuffleWriter and UnsafeShuffleWriter could not be used).
|
Note
|
SortShuffleWriter is parameterized by types for K keys, V values, and C combiner values.
|
| Name | Description |
|---|---|
NOTE: Since write does not return a value, |
|
Internal flag to mark that |
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating SortShuffleWriter Instance
SortShuffleWriter takes the following when created:
-
mapId— the mapper task id
|
Note
|
SortShuffleWriter is created when SortShuffleManager returns a ShuffleWriter for the fallback BaseShuffleHandle.
|
Writing Records Into Shuffle Partitioned File In Disk Store — write Method
write(records: Iterator[Product2[K, V]]): Unit
|
Note
|
write is a part of ShuffleWriter contract to write a sequence of records (for a RDD partition).
|
Internally, write creates a ExternalSorter with the types K, V, C or K, V, V depending on mapSideCombine flag of the ShuffleDependency being enabled or not, respectively.
|
Note
|
ShuffleDependency is defined when SortShuffleWriter is created (as the dependency of BaseShuffleHandle).
|
|
Note
|
write makes sure that Aggregator is defined for ShuffleDependency when mapSideCombine flag is enabled.
|
write requests IndexShuffleBlockResolver for the shuffle data output file (for the ShuffleDependency and mapId) and creates a temporary file for the shuffle data file in the same directory.
write creates a ShuffleBlockId (for the ShuffleDependency and mapId and the special IndexShuffleBlockResolver.NOOP_REDUCE_ID reduce id).
write requests IndexShuffleBlockResolver to write an index file (for the temporary partitioned file).
write creates a MapStatus (with the location of the shuffle server that serves the executor’s shuffle files and the sizes of the shuffle partitioned file’s partitions).
|
Note
|
The newly-created MapStatus is available as mapStatus internal attribute.
|
|
Note
|
write does not handle exceptions so when they occur, they will break the processing.
|
In the end, write deletes the temporary partitioned file. You may see the following ERROR message in the logs if write did not manage to do so:
ERROR Error while deleting temp file [path]
Closing SortShuffleWriter (and Calculating MapStatus) — stop Method
stop(success: Boolean): Option[MapStatus]
|
Note
|
stop is a part of ShuffleWriter contract to close itself (and return the last written MapStatus).
|
Otherwise, when stopping flag is already enabled or the input success is disabled, stop returns no MapStatus (i.e. None).
In the end, stop stops the ExternalSorter and increments the shuffle write time task metrics.