val queries = spark.streams
StreamingQueryManager — Streaming Query Management
|
Note
|
StreamingQueryManager is an experimental feature of Spark 2.0.0.
|
A StreamingQueryManager is the Management API for continuous queries per SQLContext.
|
Note
|
There is a single StreamingQueryManager instance per SQLContext session.
|
You can access StreamingQueryManager for the current SQLContext using SQLContext.streams method. It is lazily created when a SQLContext instance starts.
Initialization
StreamingQueryManager manages the following instances:
-
StateStoreCoordinatorRef(asstateStoreCoordinator) -
StreamingQueryListenerBus (as
listenerBus) -
activeQuerieswhich is a mutable mapping between query names andStreamingQueryobjects.
startQuery
startQuery(name: String,
checkpointLocation: String,
df: DataFrame,
sink: Sink,
trigger: Trigger = ProcessingTime(0)): StreamingQuery
startQuery is a private[sql] method to start a StreamingQuery.
|
Note
|
It is called exclusively by DataStreamWriter.start. |
|
Note
|
By default, trigger is ProcessingTime(0).
|
startQuery makes sure that activeQueries internal registry does not contain the query under name. It throws an IllegalArgumentException if it does.
It transforms the LogicalPlan of the input DataFrame df so all StreamingRelation "nodes" become StreamingExecutionRelation. It uses DataSource.createSource(metadataPath) where metadataPath is $checkpointLocation/sources/$nextSourceId. Otherwise, it returns the LogicalPlan untouched.
It finally creates StreamExecution and starts it. It also registers the StreamExecution instance in activeQueries internal registry.
Return All Active Continuous Queries per SQLContext
active: Array[StreamingQuery]
active method returns a collection of StreamingQuery instances for the current SQLContext.
Getting Active Continuous Query By Name
get(name: String): StreamingQuery
get method returns a StreamingQuery by name.
It may throw an IllegalArgumentException when no StreamingQuery exists for the name.
java.lang.IllegalArgumentException: There is no active query with name hello
at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.StreamingQueryManager.get(StreamingQueryManager.scala:58)
... 49 elided
StreamingQueryListener Management - Adding or Removing Listeners
-
addListener(listener: StreamingQueryListener): Unitaddslistenerto the internallistenerBus. -
removeListener(listener: StreamingQueryListener): Unitremoveslistenerfrom the internallistenerBus.
postListenerEvent
postListenerEvent(event: StreamingQueryListener.Event): Unit
postListenerEvent posts a StreamingQueryListener.Event to listenerBus.
StreamingQueryListener
|
Caution
|
FIXME |
StreamingQueryListener is an interface for listening to query life cycle events, i.e. a query start, progress and termination events.
lastTerminatedQuery - internal barrier
|
Caution
|
FIXME Why is lastTerminatedQuery needed?
|
Used in:
-
awaitAnyTermination -
awaitAnyTermination(timeoutMs: Long)
They all wait 10 millis before doing the check of lastTerminatedQuery being non-null.
It is set in:
-
resetTerminated()resetslastTerminatedQuery, i.e. sets it tonull. -
notifyQueryTermination(terminatedQuery: StreamingQuery)setslastTerminatedQueryto beterminatedQueryand notifies all the threads that wait onawaitTerminationLock.It is called from StreamExecution.runBatches.