Caching

Caution
FIXME

You can use CACHE TABLE [tableName] to cache tableName table in memory. It is an eager operation which is executed as soon as the statement is executed.

sql("CACHE TABLE [tableName]")

You could use LAZY keyword to make caching lazy.

// Cache Dataset -- it is lazy
scala> val df = spark.range(1).cache
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]

// Trigger caching
scala> df.show
+---+
| id|
+---+
|  0|
+---+

// Visit http://localhost:4040/storage to see the Dataset cached. It should.

// You may also use queryExecution or explain to see InMemoryRelation
// InMemoryRelation is used for cached queries
scala> df.queryExecution.withCachedData
res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *Range (0, 1, step=1, splits=Some(8))

// Use the cached Dataset in another query
// Notice InMemoryRelation in use for cached queries
scala> df.withColumn("newId", 'id).explain(extended = true)
== Parsed Logical Plan ==
'Project [*, 'id AS newId#16]
+- Range (0, 1, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint, newId: bigint
Project [id#0L, id#0L AS newId#16L]
+- Range (0, 1, step=1, splits=Some(8))

== Optimized Logical Plan ==
Project [id#0L, id#0L AS newId#16L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *Range (0, 1, step=1, splits=Some(8))

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

// Clear in-memory cache using SQL
// Equivalent to spark.catalog.clearCache
scala> sql("CLEAR CACHE").collect
res1: Array[org.apache.spark.sql.Row] = Array()

// Visit http://localhost:4040/storage to confirm the cleaning

Caching Dataset — cache Method

cache(): this.type

cache merely executes the no-argument persist method.

val ds = spark.range(5).cache

Persisting Dataset — persist Method

persist(): this.type
persist(newLevel: StorageLevel): this.type

persist caches the Dataset using the default storage level MEMORY_AND_DISK or newLevel and returns it.

Internally, persist requests the CacheManager to cache the query (that is accessible through SharedState of the current SparkSession).

Unpersisting Dataset — unpersist Method

unpersist(blocking: Boolean): this.type

unpersist uncache the Dataset possibly by blocking the call.

Internally, unpersist requests the CacheManager to uncache the query.

results matching ""

    No results matching ""