// Found at http://stackoverflow.com/a/32514683/1305344
val dataset = Seq(
"08/11/2015",
"09/11/2015",
"09/12/2015").toDF("date_string")
dataset.registerTempTable("dates")
// Inside spark-shell
scala > sql(
"""SELECT date_string,
from_unixtime(unix_timestamp(date_string,'MM/dd/yyyy'), 'EEEEE') AS dow
FROM dates""").show
+-----------+--------+
|date_string| dow|
+-----------+--------+
| 08/11/2015| Tuesday|
| 09/11/2015| Friday|
| 09/12/2015|Saturday|
+-----------+--------+
Spark SQL — 查询大规模结构化数据
Spark SQL 主要用于查询结构化和半结构化数据,使用 Dataset 操作数据
从Spark2.0最近的变更来看,Sapark SQL变的越来越重要,功能也越来越多 正在成为主要的数据操作方式
像SQL和NOSQL数据苦,Spark SQL也提供查询优化,Spark SQL的查询优化是通过Catalyst实现的.优化主要有: - 逻辑语法树优化 - 代码生成n (自动生成的代码通常比你手写的代码更高效) - Tungsten execution engine - Internal Binary Row Format.
Note
|
关于 Apache Drill 的表述同样也适用于Spark SQL:
|
来一段代码展示 一个简单的ETL流程 从一个JSON文件中读取部分数据另存为CSV文件.
spark.read
.format("json")
.load("input-json")
.select("name", "score")
.where($"score" > 15)
.write
.format("csv")
.save("output-csv")
使用 结构化数据流 功能可以将静态的批处理查询转化为动态延续的实时应用 , 上代码:
import org.apache.spark.sql.types._
val schema = StructType(
StructField("id", LongType, nullable = false) ::
StructField("name", StringType, nullable = false) ::
StructField("score", DoubleType, nullable = false) :: Nil)
spark.readStream
.format("json")
.schema(schema)
.load("input-json")
.select("name", "score")
.where('score > 15)
.writeStream
.format("console")
.start
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+
在Spark2.0里最主要的数据操作对象是 Dataset. DataSet表示已知模式(schema)信息的 结构化数据 . Dataset表示的结构化数据可以使用 compact binary representation 列压缩格式保存到Java虚拟你的堆外内存中以减少内存占用和垃圾回收时间.
Spark SQL支持 过滤下推(predicate pushdown) 和 运行时生成优化代码(generate optimized code at runtime)以提高查询执行效率.
Spark SQL提高的API:
-
Dataset API (类似 DataFrame API) 强类型一致的使用方法跟LINQ很像.
-
如果你不想编写代码可以直接使用SQL,Spark SQL良好的集成了Hive
-
如果你喜欢用JDBC 可以使用 (through Thrift JDBC/ODBC Server) 以JDBC的方式连接Spark分布式查询引擎.
Spark SQL提供了访问分布式存储上(如 Cassandra,HDFS(Hive,Parquet,JSON))数据的(形式)统一的访问接口 DataFrameReader 和DataFrameWriter.
Spark SQL定义了三种类型的函数:
-
内建函数 和 用户定义函数 (UDFs): 以单行数据为参数并为每行数据返回一个处理结果.
-
聚合函数: 以一组(包括多行)数据为参数并为每组数据返回一个处理结果
-
窗口聚合函数 (Windows): 以一组(包含多行)数据为参数并为每组数据的每一行都返回一个处理结果
支持两种元信息(catalog)存储方式 — 1.保存在内存种
(默认方式) 2.使用Hive的元信息
— 可以通过 spark.sql.catalogImplementation 设置 使用的元信息存储方式.
实例代码:
// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
.agg(max('j).as("aggOrdering"))
.orderBy(sum('j))
.as[(Int, Int)]
query.collect contains (1, 2) // true
// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
| 1|
| 0|
+-------------------+