Spark SQL — 查询大规模结构化数据

Spark SQL 主要用于查询结构化和半结构化数据,使用 Dataset 操作数据

从Spark2.0最近的变更来看,Sapark SQL变的越来越重要,功能也越来越多 正在成为主要的数据操作方式

// Found at http://stackoverflow.com/a/32514683/1305344
val dataset = Seq(
   "08/11/2015",
   "09/11/2015",
   "09/12/2015").toDF("date_string")

dataset.registerTempTable("dates")

// Inside spark-shell
scala > sql(
  """SELECT date_string,
        from_unixtime(unix_timestamp(date_string,'MM/dd/yyyy'), 'EEEEE') AS dow
      FROM dates""").show
+-----------+--------+
|date_string|     dow|
+-----------+--------+
| 08/11/2015| Tuesday|
| 09/11/2015|  Friday|
| 09/12/2015|Saturday|
+-----------+--------+

像SQL和NOSQL数据苦,Spark SQL也提供查询优化,Spark SQL的查询优化是通过Catalyst实现的.优化主要有: - 逻辑语法树优化 - 代码生成n (自动生成的代码通常比你手写的代码更高效) - Tungsten execution engine - Internal Binary Row Format.

Spark SQL用 Dataset (以前是 DataFrame)表示类表类数据. Dataset 被设计用来在Spark上简单高效的处理类表类数据.

Note

关于 Apache Drill 的表述同样也适用于Spark SQL:

一个不需要特别指定原数据的既适用于关系性数据库又适用于NoSQL数据库,既可以查询结构化数据又可以查询非结构数据(JSON,Parquet,HBase)的查询引擎.

来一段代码展示 一个简单的ETL流程 从一个JSON文件中读取部分数据另存为CSV文件.

spark.read
  .format("json")
  .load("input-json")
  .select("name", "score")
  .where($"score" > 15)
  .write
  .format("csv")
  .save("output-csv")

使用 结构化数据流 功能可以将静态的批处理查询转化为动态延续的实时应用 , 上代码:

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

spark.readStream
  .format("json")
  .schema(schema)
  .load("input-json")
  .select("name", "score")
  .where('score > 15)
  .writeStream
  .format("console")
  .start

// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +-----+-----+
// | name|score|
// +-----+-----+
// |Jacek| 20.5|
// +-----+-----+

在Spark2.0里最主要的数据操作对象是 Dataset. DataSet表示已知模式(schema)信息的 结构化数据 . Dataset表示的结构化数据可以使用 compact binary representation 列压缩格式保存到Java虚拟你的堆外内存中以减少内存占用和垃圾回收时间.

Spark SQL提高的API:

  1. Dataset API (类似 DataFrame API) 强类型一致的使用方法跟LINQ很像.

  2. Structured Streaming API (Streaming Datasets) 用于持续增量查询

  3. 如果你不想编写代码可以直接使用SQL,Spark SQL良好的集成了Hive

  4. 如果你喜欢用JDBC 可以使用 (through Thrift JDBC/ODBC Server) 以JDBC的方式连接Spark分布式查询引擎.

Spark SQL提供了访问分布式存储上(如 Cassandra,HDFS(Hive,Parquet,JSON))数据的(形式)统一的访问接口 DataFrameReaderDataFrameWriter.

Spark SQL定义了三种类型的函数:

支持两种元信息(catalog)存储方式 — 1.保存在内存种 (默认方式) 2.使用Hive的元信息 — 可以通过 spark.sql.catalogImplementation 设置 使用的元信息存储方式.

实例代码:

// Example 1
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)
  .agg(max('j).as("aggOrdering"))
  .orderBy(sum('j))
  .as[(Int, Int)]
query.collect contains (1, 2) // true

// Example 2
val df = Seq((1, 1), (-1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
scala> sql("SELECT IF(a > 0, a, 0) FROM (SELECT key a FROM src) temp").show
+-------------------+
|(IF((a > 0), a, 0))|
+-------------------+
|                  1|
|                  0|
+-------------------+

results matching ""

    No results matching ""