SparkSession — Spark SQL的入口

SparkSession 是Spark SQL的入口--使用Spark SQL时需要创建的第一个对象.

Note
在Spark 2.0将 SQLContextHiveContext 统一放在了SparkSession里.

你可以使用 SparkSession.builder 创建 SparkSession 对象.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
  .appName("My Spark Application")  // 应用名称 可以不设置 如果你不设置会自动生成一个
  .master("local[*]")               // 应用运行模式  local standalone mesos yarn
  .enableHiveSupport()              // 启用Hive支持
  .config("spark.sql.warehouse.dir", "target/spark-warehouse") //设置仓库地址 后面有介绍
  .getOrCreate

调用stop 方法关闭SparkSession .

spark.stop

在一个Spark应用里可以起多个SparkSession.

SparkSession 内部维护了一个SparkContext 对象 和一个可选的SharedState 对象 用于多个SparkSession对象间共享数据.

Table 1. SparkSession 静态方法和对象方法
方法 描述

builder

用于创建SparkSession 对象

version

获取当前Spark的版本.

implicits

包含将Scala对象转换为DataSet的隐式转换 可以通过 导入 import spark.implicits._ 使用.

emptyDataset[T]

创建一个空的 Dataset[T].

range

创建一个 Dataset[Long].

sql

执行sql返回一个DataFrame.

udf

用于设置用户自定定义的SQL 函数(UDFs).

table

将一个表映射为一个DataFrame对象

catalog

用于访问结构化数据的元信息(结构描述信息)

read

用于读取外部存储系统的文件或数据并生成 DataFrame 对象.

conf

获取当前的配置信息.

readStream

用于读取流失数据.

streams

使用结构化流式查询.

newSession

创建一个新的 SparkSession.

stop

关闭 SparkSession.

Tip

使用配置项 spark.sql.warehouse.dir 可以改变Hive hive.metastore.warehouse.dir 配置项值

更多参考信息:

SharedState

Hive元数据管理.

创建 SparkSession 对象

builder(): Builder

builder 创建一个 Builder 对象,你可以用它创建Spark Session对像.

import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder

查看Spark版本信息 — version 方法

version: String

version 返回Spark版本信息

隐式转换对象

用于scala编程的隐式转换 可以方便的将scala对象转换为 Datasets, DataFramesColumns. 并为scala "基础类型(Int,Double)" 定义了 编码器.

Note

使用 import spark.implicits._ 导入隐式转换

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

implicits 支持从 RDD (作用域里存在 encoder) case classes,tuples,Seq 创建 Dataset.

implicits 还支持将 Scala的Symbol$ 转换成 Column.

支持将包含Product类型(case类,元组)的RDDSeq转换为DataFrame. 支持将包含Int, LongStringRDD转换为只有一个名为_1列的DataFrame`.

注意: 需要通过调用toDF方法将包含基本数据类型(Int, Long, String) 的RDD转换为DataFrame.

方法emptyDataset — 创建一个空的DataSet

emptyDataset[T: Encoder]: Dataset[T]

emptyDataset 创建一个空的 Dataset.

scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]

scala> strings.printSchema
root
 |-- value: string (nullable = true)

emptyDataset 创建了一个 LocalRelation 逻辑查询计划.

方法createDataset — 从集合或RDD创建DataSet

createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]

在测试和实验时用createDataset可以方便的从集合类型创建 Dataset.

scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> one.show
+-----+
|value|
+-----+
|    1|
+-----+

createDataset 创建了一个 LocalRelation 逻辑查询计划 .

[小贴上]

你也可以使用:spark-sql-dataset.adoc#implicits[Scala隐式转换或者toDS方法] 完成类似的转换.

val spark: SparkSession = ...
import spark.implicits._

scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]

createDataset 首先在schema的属性作用域(AttributeReference)里查找隐式的 encoder.

注意: 当前只支持 unresolved expression encoders.

然后会使用这个encoder表达式对集合或RDD做map操作返回DataSet.

range方法 — 创建只有一个Long类型列的DataSet

range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]

range 系列方法创建包含Long类型数据的 Dataset.

scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
|  0|
|  2|
+---+

注意: 如果不指定第四个参数(numPartitions) 默认使用 SparkContext.defaultParallelism.

range 创建了一个带Range 逻辑计划Encoders.LONG encoderDataset[Long] .

方法emptyDataFrame — 创建一个空的DataFrame

emptyDataFrame: DataFrame

emptyDataFrame 创建一个没有行也没有列的空的 DataFrame.

方法createDataFrame — Creating DataFrames from RDDs with Explicit Schema — 

createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.

Executing SQL Queries — sql method

sql(sqlText: String): DataFrame

sql executes the sqlText SQL statement.

scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]

scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []

// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")

scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata|      false|
+---------+-----------+

Internally, it creates a Dataset using the current SparkSession and a logical plan. The plan is created by parsing the input sqlText using sessionState.sqlParser.

Accessing UDF Registration Interface — udf Attribute

udf: UDFRegistration

udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based query expressions.

val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)

val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")

scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
|    a|    A|
|    b|    B|
|    c|    C|
+-----+-----+

Internally, it is an alias for SessionState.udf.

Creating DataFrames from Tables — table method

table(tableName: String): DataFrame

table creates a DataFrame from records in the tableName table (if exists).

val df = spark.table("mytable")

Accessing Metastore — catalog Attribute

catalog: Catalog

catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and temporary views).

Tip
All methods in Catalog return Datasets.
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
|              name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default|       null|  MANAGED|      false|
|              strs|    null|       null|TEMPORARY|       true|
+------------------+--------+-----------+---------+-----------+

Internally, catalog creates a CatalogImpl (referencing the current SparkSession).

Accessing DataFrameReader — read method

read: DataFrameReader

read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.

val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read

Runtime Configuration — conf attribute

conf: RuntimeConfig

conf returns the current runtime configuration (as RuntimeConfig) that wraps SQLConf.

Caution
FIXME

sessionState Property

sessionState is a transient lazy value that represents the current SessionState.

Note
sessionState is a private[sql] value so you can only access it in a code inside org.apache.spark.sql package.

sessionState is a lazily-created value based on the internal spark.sql.catalogImplementation setting that can be:

  • org.apache.spark.sql.hive.HiveSessionState for hive

  • org.apache.spark.sql.internal.SessionState for in-memory

readStream method

readStream: DataStreamReader

readStream returns a new DataStreamReader.

streams Attribute

streams: StreamingQueryManager

streams attribute gives access to StreamingQueryManager (through SessionState).

val spark: SparkSession = ...
spark.streams.active.foreach(println)

streamingQueryManager Attribute

streamingQueryManager is…​

listenerManager Attribute

listenerManager is…​

ExecutionListenerManager

ExecutionListenerManager is…​

functionRegistry Attribute

functionRegistry is…​

experimentalMethods Attribute

experimental: ExperimentalMethods

experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.

Note
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.

newSession method

newSession(): SparkSession

newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).

scala> println(sc.version)
2.0.0-SNAPSHOT

scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a

sharedState Attribute

sharedState is the current SharedState. It is created lazily when first accessed.

SharedState

SharedState is an internal class that holds the shared state across active SQL sessions (as SparkSession instances) by sharing CacheManager, SQLListener, and ExternalCatalog.

Tip

Enable INFO logging level for org.apache.spark.sql.internal.SharedState logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.internal.SharedState=INFO

Refer to Logging.

SharedState requires a SparkContext when created. It also adds hive-site.xml to Hadoop’s Configuration in the current SparkContext if found on CLASSPATH.

Note
hive-site.xml is an optional Hive configuration file when working with Hive in Spark.

The fully-qualified class name is org.apache.spark.sql.internal.SharedState.

SharedState is created lazily, i.e. when first accessed after SparkSession is created. It can happen when a new session is created or when the shared services are accessed. It is created with a SparkContext.

When created, SharedState sets hive.metastore.warehouse.dir to spark.sql.warehouse.dir if hive.metastore.warehouse.dir is not set or spark.sql.warehouse.dir is set. Otherwise, when hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not, spark.sql.warehouse.dir gets set to hive.metastore.warehouse.dir. You should see the following INFO message in the logs:

INFO spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('[hiveWarehouseDir]').

You should see the following INFO message in the logs:

INFO SharedState: Warehouse path is '[warehousePath]'.

Stopping SparkSession — stop Method

stop(): Unit

stop stops the SparkSession, i.e. stops the underlying SparkContext.

Creating SparkSession Instance

Caution
FIXME

baseRelationToDataFrame Method

Caution
FIXME

results matching ""

    No results matching ""