import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
.appName("My Spark Application") // 应用名称 可以不设置 如果你不设置会自动生成一个
.master("local[*]") // 应用运行模式 local standalone mesos yarn
.enableHiveSupport() // 启用Hive支持
.config("spark.sql.warehouse.dir", "target/spark-warehouse") //设置仓库地址 后面有介绍
.getOrCreate
SparkSession
— Spark SQL的入口
SparkSession
是Spark SQL的入口--使用Spark SQL时需要创建的第一个对象.
Note
|
在Spark 2.0将 SQLContext 和 HiveContext 统一放在了SparkSession里. |
你可以使用 SparkSession.builder 创建 SparkSession
对象.
调用stop 方法关闭SparkSession
.
spark.stop
在一个Spark应用里可以起多个SparkSession
.
SparkSession
内部维护了一个SparkContext 对象 和一个可选的SharedState 对象 用于多个SparkSession对象间共享数据.
方法 | 描述 |
---|---|
用于创建 |
|
获取当前Spark的版本. |
|
包含将Scala对象转换为DataSet的隐式转换 可以通过 导入 |
|
创建一个空的 |
|
创建一个 |
|
执行sql返回一个 |
|
用于设置用户自定定义的SQL 函数(UDFs). |
|
将一个表映射为一个 |
|
用于访问结构化数据的元信息(结构描述信息) |
|
用于读取外部存储系统的文件或数据并生成 |
|
获取当前的配置信息. |
|
用于读取流失数据. |
|
使用结构化流式查询. |
|
创建一个新的 |
|
关闭 |
Tip
|
使用配置项 spark.sql.warehouse.dir
可以改变Hive
|
创建 SparkSession
对象
builder(): Builder
builder
创建一个 Builder 对象,你可以用它创建Spark Session对像.
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
隐式转换对象
用于scala编程的隐式转换 可以方便的将scala对象转换为 Datasets, DataFrames 和 Columns. 并为scala "基础类型(Int,Double)" 定义了 编码器.
Note
|
使用
|
implicits
支持从 RDD
(作用域里存在 encoder) case classes,tuples,Seq
创建 Dataset
.
implicits
还支持将 Scala的Symbol
和$
转换成 Column
.
支持将包含Product类型(case类,元组)的RDD
和Seq
转换为DataFrame
.
支持将包含Int
, Long
和 String
的RDD
转换为只有一个名为_1
列的DataFrame`.
注意: 需要通过调用toDF
方法将包含基本数据类型(Int
, Long
, String
) 的RDD转换为DataFrame.
方法emptyDataset
— 创建一个空的DataSet
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset
创建一个空的 Dataset.
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset
创建了一个 LocalRelation
逻辑查询计划.
方法createDataset
— 从集合或RDD创建DataSet
createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]
在测试和实验时用createDataset
可以方便的从集合类型创建 Dataset.
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset
创建了一个 LocalRelation
逻辑查询计划 .
[小贴上]
你也可以使用:spark-sql-dataset.adoc#implicits[Scala隐式转换或者toDS
方法] 完成类似的转换.
val spark: SparkSession = ...
import spark.implicits._
scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]
注意: 当前只支持 unresolved expression encoders.
然后会使用这个encoder表达式对集合或RDD做map操作返回DataSet.
range
方法 — 创建只有一个Long类型列的DataSet
range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
range
系列方法创建包含Long类型数据的 Dataset.
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
+---+
注意: 如果不指定第四个参数(numPartitions
) 默认使用
SparkContext.defaultParallelism.
方法emptyDataFrame
— 创建一个空的DataFrame
emptyDataFrame: DataFrame
emptyDataFrame
创建一个没有行也没有列的空的 DataFrame
.
方法createDataFrame
— Creating DataFrames from RDDs with Explicit Schema —
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame
creates a DataFrame
using RDD[Row]
and the input schema
. It is assumed that the rows in rowRDD
all match the schema
.
Executing SQL Queries — sql
method
sql(sqlText: String): DataFrame
sql
executes the sqlText
SQL statement.
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []
// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")
scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata| false|
+---------+-----------+
Internally, it creates a Dataset using the current SparkSession
and a logical plan. The plan is created by parsing the input sqlText
using sessionState.sqlParser.
Caution
|
FIXME See Executing SQL Queries. |
Accessing UDF Registration Interface — udf
Attribute
udf: UDFRegistration
udf
attribute gives access to UDFRegistration
that allows registering user-defined functions for SQL-based query expressions.
val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)
val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")
scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
| a| A|
| b| B|
| c| C|
+-----+-----+
Internally, it is an alias for SessionState.udf.
Creating DataFrames from Tables — table
method
table(tableName: String): DataFrame
table
creates a DataFrame from records in the tableName
table (if exists).
val df = spark.table("mytable")
Accessing Metastore — catalog
Attribute
catalog: Catalog
catalog
attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and temporary views).
Tip
|
All methods in Catalog return Datasets .
|
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
Internally, catalog
creates a CatalogImpl (referencing the current SparkSession
).
Accessing DataFrameReader — read
method
read: DataFrameReader
read
method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame
.
val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read
Runtime Configuration — conf
attribute
conf: RuntimeConfig
conf
returns the current runtime configuration (as RuntimeConfig
) that wraps SQLConf.
Caution
|
FIXME |
sessionState
Property
sessionState
is a transient lazy value that represents the current SessionState.
Note
|
sessionState is a private[sql] value so you can only access it in a code inside org.apache.spark.sql package.
|
sessionState
is a lazily-created value based on the internal spark.sql.catalogImplementation setting that can be:
-
org.apache.spark.sql.hive.HiveSessionState
forhive
-
org.apache.spark.sql.internal.SessionState
forin-memory
streams
Attribute
streams: StreamingQueryManager
streams
attribute gives access to StreamingQueryManager (through SessionState).
val spark: SparkSession = ...
spark.streams.active.foreach(println)
experimentalMethods
Attribute
experimental: ExperimentalMethods
experimentalMethods
is an extension point with ExperimentalMethods
that is a per-session collection of extra strategies and Rule[LogicalPlan]
s.
Note
|
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.
|
newSession
method
newSession(): SparkSession
newSession
creates (starts) a new SparkSession
(with the current SparkContext and SharedState).
scala> println(sc.version)
2.0.0-SNAPSHOT
scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a
sharedState
Attribute
sharedState
is the current SharedState. It is created lazily when first accessed.
SharedState
SharedState
is an internal class that holds the shared state across active SQL sessions (as SparkSession instances) by sharing CacheManager, SQLListener, and ExternalCatalog.
Tip
|
Enable Add the following line to
Refer to Logging. |
SharedState
requires a SparkContext when created. It also adds hive-site.xml
to Hadoop’s Configuration
in the current SparkContext if found on CLASSPATH.
Note
|
hive-site.xml is an optional Hive configuration file when working with Hive in Spark.
|
The fully-qualified class name is org.apache.spark.sql.internal.SharedState
.
SharedState
is created lazily, i.e. when first accessed after SparkSession
is created. It can happen when a new session is created or when the shared services are accessed. It is created with a SparkContext.
When created, SharedState
sets hive.metastore.warehouse.dir
to spark.sql.warehouse.dir if hive.metastore.warehouse.dir
is not set or spark.sql.warehouse.dir
is set. Otherwise, when hive.metastore.warehouse.dir
is set and spark.sql.warehouse.dir
is not, spark.sql.warehouse.dir
gets set to hive.metastore.warehouse.dir
. You should see the following INFO message in the logs:
INFO spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('[hiveWarehouseDir]').
You should see the following INFO message in the logs:
INFO SharedState: Warehouse path is '[warehousePath]'.
Stopping SparkSession — stop
Method
stop(): Unit
stop
stops the SparkSession
, i.e. stops the underlying SparkContext
.
Creating SparkSession
Instance
Caution
|
FIXME |
baseRelationToDataFrame
Method
Caution
|
FIXME |