log4j.logger.org.apache.spark.streaming.dstream.DStream=DEBUG
DStream — Discretized Stream
Discretized Stream (DStream) is the fundamental concept of Spark Streaming. It is basically a stream of RDDs with elements being the data received from input streams for batch (possibly extended in scope by windowed or stateful operators).
There is no notion of input and output dstreams. DStreams are all instances of DStream abstract class (see DStream Contract in this document). You may however correctly assume that all dstreams are input. And it happens to be so until you register a dstream that marks it as output.
| Name | Initial Value | Description |
|---|---|---|
|
|
StorageLevel of the RDDs in the |
|
|
The flag to inform whether it was restored from checkpoint. |
|
|
The reference to DStreamGraph. |
A DStream is represented as org.apache.spark.streaming.dstream.DStream abstract class.
|
Tip
|
Enable Add the following line to Refer to Logging. |
DStream Contract
A DStream is defined by the following properties (with the names of the corresponding methods that subclasses have to implement):
-
dstream dependencies, i.e. a collection of
DStreamsthat thisDStreamdepends on. They are often referred to as parent dstreams.def dependencies: List[DStream[_]] -
slide duration (aka slide interval), i.e. a time interval after which the stream is requested to generate a RDD out of input data it consumes.
def slideDuration: Duration -
How to compute (generate) an optional RDD for the given batch if any.
validTimeis a point in time that marks the end boundary of slide duration.def compute(validTime: Time): Option[RDD[T]]
Creating DStreams
You can create dstreams through the built-in input stream constructors using streaming context or more specialized add-ons for external input data sources, e.g. Apache Kafka.
|
Note
|
DStreams can only be created before StreamingContext is started.
|
Zero Time (aka zeroTime)
Zero time (internally zeroTime) is the time when a dstream was initialized.
It serves as the initialization marker (via isInitialized method) and helps calculating intervals for RDD checkpointing (when checkpoint interval is set and the current batch time is a multiple thereof), slicing, and the time validation for a batch (when a dstream generates a RDD).
Remember Interval (aka rememberDuration)
Remember interval (internally rememberDuration) is the time interval for how long a dstream is supposed to remember (aka cache) RDDs created. This is a mandatory attribute of every dstream which is validated at startup.
|
Note
|
It is used for metadata cleanup of a dstream. |
Initially, when a dstream is created, the remember interval is not set (i.e. null), but is set when the dstream is initialized.
It can be set to a custom value using remember method.
|
Note
|
You may see the current value of remember interval when a dstream is validated at startup and the log level is INFO. |
generatedRDDs - Internal Cache of Batch Times and Corresponding RDDs
generatedRDDs is an internal collection of pairs of batch times and the corresponding RDDs that were generated for the batch. It acts as a cache when a dstream is requested to compute a RDD for batch (i.e. generatedRDDs may already have the RDD or gets a new RDD added).
generatedRDDs is empty initially, i.e. when a dstream is created.
It is a transient data structure so it is not serialized when a dstream is. It is initialized to an empty collection when deserialized. You should see the following DEBUG message in the logs when it happens:
DEBUG [the simple class name of dstream].readObject used
As new RDDs are added, dstreams offer a way to clear the old metadata during which the old RDDs are removed from generatedRDDs collection.
If checkpointing is used, generatedRDDs collection can be recreated from a storage.
Initializing DStreams — initialize Method
initialize(time: Time): Unit
initialize method sets zero time and optionally checkpoint interval (if the dstream must checkpoint and the interval was not set already) and remember duration.
|
Note
|
initialize method is called for output dstreams only when DStreamGraph is started.
|
The zero time of a dstream can only be set once or be set again to the same zero time. Otherwise, it throws SparkException as follows:
ZeroTime is already initialized to [zeroTime], cannot initialize it again to [time]
It verifies that checkpoint interval is defined when mustCheckpoint was enabled.
|
Note
|
The internal mustCheckpoint flag is disabled by default. It is set by custom dstreams like StateDStreams.
|
If mustCheckpoint is enabled and the checkpoint interval was not set, it is automatically set to the slide interval or 10 seconds, whichever is longer. You should see the following INFO message in the logs when the checkpoint interval was set automatically:
INFO [DStreamType]: Checkpoint interval automatically set to [checkpointDuration]
It then ensures that remember interval is at least twice the checkpoint interval (only if defined) or the slide duration.
At the very end, it initializes the parent dstreams (available as dependencies) that recursively initializes the entire graph of dstreams.
remember Method
remember(duration: Duration): Unit
remember sets remember interval for the current dstream and the dstreams it depends on (see dependencies).
If the input duration is specified (i.e. not null), remember allows setting the remember interval (only when the current value was not set already) or extend it (when the current value is shorter).
You should see the following INFO message in the logs when the remember interval changes:
INFO Duration for remembering RDDs set to [rememberDuration] for [dstream]
At the end, remember always sets the current remember interval (whether it was set, extended or did not change).
Checkpointing DStreams — checkpoint Method
checkpoint(interval: Duration): DStream[T]
You use checkpoint(interval: Duration) method to set up a periodic checkpointing every (checkpoint) interval.
You can only enable checkpointing and set the checkpoint interval before StreamingContext is started or UnsupportedOperationException is thrown as follows:
java.lang.UnsupportedOperationException: Cannot change checkpoint interval of an DStream after streaming context has started
at org.apache.spark.streaming.dstream.DStream.checkpoint(DStream.scala:177)
... 43 elided
Internally, checkpoint method calls persist (that sets the default MEMORY_ONLY_SER storage level).
If checkpoint interval is set, the checkpoint directory is mandatory. Spark validates it when StreamingContext starts and throws a IllegalArgumentException exception if not set.
java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().
You can see the value of the checkpoint interval for a dstream in the logs when it is validated:
INFO Checkpoint interval = [checkpointDuration]
Checkpointing
DStreams can checkpoint input data at specified time intervals.
The following settings are internal to a dstream and define how it checkpoints the input data if any.
-
mustCheckpoint(default:false) is an internal private flag that marks a dstream as being checkpointed (true) or not (false). It is an implementation detail and the author of aDStreamimplementation sets it.Refer to Initializing DStreams (initialize method) to learn how it is used to set the checkpoint interval, i.e.
checkpointDuration. -
checkpointDurationis a configurable property that says how often a dstream checkpoints data. It is often called checkpoint interval. If not set explicitly, but the dstream is checkpointed, it will be while initializing dstreams. -
checkpointDatais an instance of DStreamCheckpointData. -
restoredFromCheckpointData(default:false) is an internal flag to describe the initial state of a dstream, i.e.. whether (true) or not (false) it was started by restoring state from checkpoint.
Validating Setup at Startup — validateAtStart Method
|
Caution
|
FIXME Describe me! |
Registering Output Streams — register Method
register(): DStream[T]
DStream by design has no notion of being an output stream. It is DStreamGraph to know and be able to differentiate between input and output streams.
DStream comes with internal register method that registers a DStream as an output stream.
The internal private foreachRDD method uses register to register output streams to DStreamGraph. Whenever called, it creates ForEachDStream and calls register upon it. That is how streams become output streams.
Generating Streaming Job For Batch For Output DStream — generateJob Internal Method
generateJob(time: Time): Option[Job]
generateJob generates a streaming job for a time batch for a (output) dstream. It may or may not generate a streaming job for the requested batch time if there are RDDs to process.
|
Note
|
generateJob is called when DStreamGraph generates jobs for a batch time.
|
It computes an RDD for the batch and, if there is one, returns a streaming job for the batch time and a job function that will run a Spark job (with the generated RDD and the job function) when executed.
|
Note
|
The Spark job uses an empty function to calculate partitions of a RDD. |
|
Caution
|
FIXME What happens when SparkContext.runJob(rdd, emptyFunc) is called with the empty function, i.e. (iterator: Iterator[T]) ⇒ {}?
|
Computing RDD for Batch — getOrCompute Internal Method
getOrCompute(time: Time): Option[RDD[T]]
getOrCompute returns an optional RDD for a time batch.
|
Note
|
getOrCompute is private[streaming] final method.
|
getOrCompute uses generatedRDDs to return the RDD if it has already been generated for the time. If not, it generates one by computing the input stream (using compute(validTime: Time) method).
If there was anything to process in the input stream, i.e. computing the input stream returned a RDD, the RDD is first persisted (only if storageLevel for the input stream is different from NONE storage level).
You should see the following DEBUG message in the logs:
DEBUG Persisting RDD [id] for time [time] to [storageLevel]
The generated RDD is checkpointed if checkpointDuration is defined and the time interval between current and zero times is a multiple of checkpointDuration.
You should see the following DEBUG message in the logs:
DEBUG Marking RDD [id] for time [time] for checkpointing
The generated RDD is saved in the internal generatedRDDs registry.
|
Note
|
getOrCompute is used when a DStream is requested to generate a streaming job for a batch.
|
Caching and Persisting
|
Caution
|
FIXME |
Checkpoint Cleanup
|
Caution
|
FIXME |
restoreCheckpointData
restoreCheckpointData(): Unit
restoreCheckpointData does its work only when the internal transient restoredFromCheckpointData flag is disabled (i.e. false) and is so initially.
|
Note
|
restoreCheckpointData method is called when DStreamGraph is requested to restore state of output dstreams.
|
If restoredFromCheckpointData is disabled, you should see the following INFO message in the logs:
INFO ...DStream: Restoring checkpoint data
DStreamCheckpointData.restore() is executed. And then restoreCheckpointData method is executed for every dstream the current dstream depends on (see DStream Contract).
Once completed, the internal restoredFromCheckpointData flag is enabled (i.e. true) and you should see the following INFO message in the logs:
INFO Restored checkpoint data
Metadata Cleanup — clearMetadata Method
|
Note
|
It is called when DStreamGraph clears metadata for every output stream. |
clearMetadata(time: Time) is called to remove old RDDs that have been generated so far (and collected in generatedRDDs). It is a sort of garbage collector.
When clearMetadata(time: Time) is called, it checks spark.streaming.unpersist flag (default enabled).
It collects generated RDDs (from generatedRDDs) that are older than rememberDuration.
You should see the following DEBUG message in the logs:
DEBUG Clearing references to old RDDs: [[time] -> [rddId], ...]
Regardless of spark.streaming.unpersist flag, all the collected RDDs are removed from generatedRDDs.
When spark.streaming.unpersist flag is set (it is by default), you should see the following DEBUG message in the logs:
DEBUG Unpersisting old RDDs: [id1, id2, ...]
For every RDD in the list, it unpersists them (without blocking) one by one and explicitly removes blocks for BlockRDDs. You should see the following INFO message in the logs:
INFO Removing blocks of RDD [blockRDD] of time [time]
After RDDs have been removed from generatedRDDs (and perhaps unpersisted), you should see the following DEBUG message in the logs:
DEBUG Cleared [size] RDDs that were older than [time]: [time1, time2, ...]
The stream passes the call to clear metadata to its dependencies.
updateCheckpointData Method
updateCheckpointData(currentTime: Time): Unit
|
Note
|
It is called when DStreamGraph is requested to do updateCheckpointData itself. |
When updateCheckpointData is called, you should see the following DEBUG message in the logs:
DEBUG Updating checkpoint data for time [currentTime] ms
It then executes DStreamCheckpointData.update(currentTime) and calls updateCheckpointData method on each dstream the dstream depends on.
When updateCheckpointData finishes, you should see the following DEBUG message in the logs:
DEBUG Updated checkpoint data for time [currentTime]: [checkpointData]