import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
.appName("My Spark Application") // 应用名称 可以不设置 如果你不设置会自动生成一个
.master("local[*]") // 应用运行模式 local standalone mesos yarn
.enableHiveSupport() // 启用Hive支持
.config("spark.sql.warehouse.dir", "target/spark-warehouse") //设置仓库地址 后面有介绍
.getOrCreate
SparkSession — Spark SQL的入口
SparkSession 是Spark SQL的入口--使用Spark SQL时需要创建的第一个对象.
|
Note
|
在Spark 2.0将 SQLContext 和 HiveContext 统一放在了SparkSession里. |
你可以使用 SparkSession.builder 创建 SparkSession 对象.
调用stop 方法关闭SparkSession .
spark.stop
在一个Spark应用里可以起多个SparkSession.
SparkSession 内部维护了一个SparkContext 对象 和一个可选的SharedState 对象 用于多个SparkSession对象间共享数据.
| 方法 | 描述 |
|---|---|
用于创建 |
|
获取当前Spark的版本. |
|
包含将Scala对象转换为DataSet的隐式转换 可以通过 导入 |
|
创建一个空的 |
|
创建一个 |
|
执行sql返回一个 |
|
用于设置用户自定定义的SQL 函数(UDFs). |
|
将一个表映射为一个 |
|
用于访问结构化数据的元信息(结构描述信息) |
|
用于读取外部存储系统的文件或数据并生成 |
|
获取当前的配置信息. |
|
用于读取流失数据. |
|
使用结构化流式查询. |
|
创建一个新的 |
|
关闭 |
|
Tip
|
使用配置项 spark.sql.warehouse.dir
可以改变Hive
|
创建 SparkSession 对象
builder(): Builder
builder 创建一个 Builder 对象,你可以用它创建Spark Session对像.
import org.apache.spark.sql.SparkSession
val builder = SparkSession.builder
隐式转换对象
用于scala编程的隐式转换 可以方便的将scala对象转换为 Datasets, DataFrames 和 Columns. 并为scala "基础类型(Int,Double)" 定义了 编码器.
|
Note
|
使用
|
implicits 支持从 RDD (作用域里存在 encoder) case classes,tuples,Seq 创建 Dataset.
implicits 还支持将 Scala的Symbol 和$ 转换成 Column.
支持将包含Product类型(case类,元组)的RDD和Seq转换为DataFrame.
支持将包含Int, Long 和 String 的RDD转换为只有一个名为_1列的DataFrame`.
注意: 需要通过调用toDF方法将包含基本数据类型(Int, Long, String) 的RDD转换为DataFrame.
方法emptyDataset — 创建一个空的DataSet
emptyDataset[T: Encoder]: Dataset[T]
emptyDataset 创建一个空的 Dataset.
scala> val strings = spark.emptyDataset[String]
strings: org.apache.spark.sql.Dataset[String] = [value: string]
scala> strings.printSchema
root
|-- value: string (nullable = true)
emptyDataset 创建了一个 LocalRelation 逻辑查询计划.
方法createDataset — 从集合或RDD创建DataSet
createDataset[T : Encoder](data: Seq[T]): Dataset[T]
createDataset[T : Encoder](data: RDD[T]): Dataset[T]
在测试和实验时用createDataset可以方便的从集合类型创建 Dataset.
scala> val one = spark.createDataset(Seq(1))
one: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> one.show
+-----+
|value|
+-----+
| 1|
+-----+
createDataset 创建了一个 LocalRelation 逻辑查询计划 .
[小贴上]
你也可以使用:spark-sql-dataset.adoc#implicits[Scala隐式转换或者toDS方法] 完成类似的转换.
val spark: SparkSession = ...
import spark.implicits._
scala> val one = Seq(1).toDS
one: org.apache.spark.sql.Dataset[Int] = [value: int]
注意: 当前只支持 unresolved expression encoders.
然后会使用这个encoder表达式对集合或RDD做map操作返回DataSet.
range方法 — 创建只有一个Long类型列的DataSet
range(end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long): Dataset[java.lang.Long]
range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long]
range 系列方法创建包含Long类型数据的 Dataset.
scala> spark.range(start = 0, end = 4, step = 2, numPartitions = 5).show
+---+
| id|
+---+
| 0|
| 2|
+---+
注意: 如果不指定第四个参数(numPartitions) 默认使用
SparkContext.defaultParallelism.
方法emptyDataFrame — 创建一个空的DataFrame
emptyDataFrame: DataFrame
emptyDataFrame 创建一个没有行也没有列的空的 DataFrame.
方法createDataFrame — Creating DataFrames from RDDs with Explicit Schema —
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
createDataFrame creates a DataFrame using RDD[Row] and the input schema. It is assumed that the rows in rowRDD all match the schema.
Executing SQL Queries — sql method
sql(sqlText: String): DataFrame
sql executes the sqlText SQL statement.
scala> sql("SHOW TABLES")
res0: org.apache.spark.sql.DataFrame = [tableName: string, isTemporary: boolean]
scala> sql("DROP TABLE IF EXISTS testData")
res1: org.apache.spark.sql.DataFrame = []
// Let's create a table to SHOW it
spark.range(10).write.option("path", "/tmp/test").saveAsTable("testData")
scala> sql("SHOW TABLES").show
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
| testdata| false|
+---------+-----------+
Internally, it creates a Dataset using the current SparkSession and a logical plan. The plan is created by parsing the input sqlText using sessionState.sqlParser.
|
Caution
|
FIXME See Executing SQL Queries. |
Accessing UDF Registration Interface — udf Attribute
udf: UDFRegistration
udf attribute gives access to UDFRegistration that allows registering user-defined functions for SQL-based query expressions.
val spark: SparkSession = ...
spark.udf.register("myUpper", (s: String) => s.toUpperCase)
val strs = ('a' to 'c').map(_.toString).toDS
strs.registerTempTable("strs")
scala> sql("SELECT *, myUpper(value) UPPER FROM strs").show
+-----+-----+
|value|UPPER|
+-----+-----+
| a| A|
| b| B|
| c| C|
+-----+-----+
Internally, it is an alias for SessionState.udf.
Creating DataFrames from Tables — table method
table(tableName: String): DataFrame
table creates a DataFrame from records in the tableName table (if exists).
val df = spark.table("mytable")
Accessing Metastore — catalog Attribute
catalog: Catalog
catalog attribute is a (lazy) interface to the current metastore, i.e. data catalog (of relational entities like databases, tables, functions, table columns, and temporary views).
|
Tip
|
All methods in Catalog return Datasets.
|
scala> spark.catalog.listTables.show
+------------------+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|my_permanent_table| default| null| MANAGED| false|
| strs| null| null|TEMPORARY| true|
+------------------+--------+-----------+---------+-----------+
Internally, catalog creates a CatalogImpl (referencing the current SparkSession).
Accessing DataFrameReader — read method
read: DataFrameReader
read method returns a DataFrameReader that is used to read data from external storage systems and load it into a DataFrame.
val spark: SparkSession = // create instance
val dfReader: DataFrameReader = spark.read
Runtime Configuration — conf attribute
conf: RuntimeConfig
conf returns the current runtime configuration (as RuntimeConfig) that wraps SQLConf.
|
Caution
|
FIXME |
sessionState Property
sessionState is a transient lazy value that represents the current SessionState.
|
Note
|
sessionState is a private[sql] value so you can only access it in a code inside org.apache.spark.sql package.
|
sessionState is a lazily-created value based on the internal spark.sql.catalogImplementation setting that can be:
-
org.apache.spark.sql.hive.HiveSessionStateforhive -
org.apache.spark.sql.internal.SessionStateforin-memory
streams Attribute
streams: StreamingQueryManager
streams attribute gives access to StreamingQueryManager (through SessionState).
val spark: SparkSession = ...
spark.streams.active.foreach(println)
experimentalMethods Attribute
experimental: ExperimentalMethods
experimentalMethods is an extension point with ExperimentalMethods that is a per-session collection of extra strategies and Rule[LogicalPlan]s.
|
Note
|
experimental is used in SparkPlanner and SparkOptimizer. Hive and Structured Streaming use it for their own extra strategies and optimization rules.
|
newSession method
newSession(): SparkSession
newSession creates (starts) a new SparkSession (with the current SparkContext and SharedState).
scala> println(sc.version)
2.0.0-SNAPSHOT
scala> val newSession = spark.newSession
newSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@122f58a
sharedState Attribute
sharedState is the current SharedState. It is created lazily when first accessed.
SharedState
SharedState is an internal class that holds the shared state across active SQL sessions (as SparkSession instances) by sharing CacheManager, SQLListener, and ExternalCatalog.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
SharedState requires a SparkContext when created. It also adds hive-site.xml to Hadoop’s Configuration in the current SparkContext if found on CLASSPATH.
|
Note
|
hive-site.xml is an optional Hive configuration file when working with Hive in Spark.
|
The fully-qualified class name is org.apache.spark.sql.internal.SharedState.
SharedState is created lazily, i.e. when first accessed after SparkSession is created. It can happen when a new session is created or when the shared services are accessed. It is created with a SparkContext.
When created, SharedState sets hive.metastore.warehouse.dir to spark.sql.warehouse.dir if hive.metastore.warehouse.dir is not set or spark.sql.warehouse.dir is set. Otherwise, when hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not, spark.sql.warehouse.dir gets set to hive.metastore.warehouse.dir. You should see the following INFO message in the logs:
INFO spark.sql.warehouse.dir is not set, but hive.metastore.warehouse.dir is set. Setting spark.sql.warehouse.dir to the value of hive.metastore.warehouse.dir ('[hiveWarehouseDir]').
You should see the following INFO message in the logs:
INFO SharedState: Warehouse path is '[warehousePath]'.
Stopping SparkSession — stop Method
stop(): Unit
stop stops the SparkSession, i.e. stops the underlying SparkContext.
Creating SparkSession Instance
|
Caution
|
FIXME |
baseRelationToDataFrame Method
|
Caution
|
FIXME |