log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG
StreamExecution
StreamExecution manages execution of a streaming query for a SQLContext and a Sink. It requires a LogicalPlan to know the Source objects from which records are periodically pulled down.
StreamExecution is a StreamingQuery with additional attributes:
-
checkpointRoot -
Trigger
It starts an internal thread (microBatchThread) to periodically (every 10 milliseconds) poll for new records in the sources and create a batch.
|
Note
|
The time between batches - 10 milliseconds - is fixed (i.e. not configurable). |
StreamExecution can be in three states:
-
INITIALIZEDwhen the instance was created. -
ACTIVEwhen batches are pulled from the sources. -
TERMINATEDwhen batches were successfully processed or the query stopped.
|
Tip
|
Enable Add the following line to Refer to Logging. |
runBatches Internal Method
runBatches(): Unit
|
Caution
|
FIXME |
scala> val out = in.write
.format("memory")
.queryName("memStream")
.startStream()
out: org.apache.spark.sql.StreamingQuery = Continuous Query - memStream [state = ACTIVE]
16/04/16 00:48:47 INFO StreamExecution: Starting new continuous query.
scala> 16/04/16 00:48:47 INFO StreamExecution: Committed offsets for batch 1.
16/04/16 00:48:47 DEBUG StreamExecution: Stream running from {} to {FileSource[hello]: #0}
16/04/16 00:48:47 DEBUG StreamExecution: Retrieving data from FileSource[hello]: None -> #0
16/04/16 00:48:47 DEBUG StreamExecution: Optimized batch in 163.940239ms
16/04/16 00:48:47 INFO StreamExecution: Completed up to {FileSource[hello]: #0} in 703.573981ms
toDebugString Method
You can call toDebugString on StreamExecution to learn about the internals.
scala> out.asInstanceOf[StreamExecution].toDebugString
res3: String =
"
=== Continuous Query ===
Name: memStream
Current Offsets: {FileSource[hello]: #0}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
FileSource[hello]
"