Spark-SQL
[TOC]
Overview
spark sql是spark的一个模块, 用于处理结构化得数据。 与spark rdd api 不同, spark SQl针对结构化数据提供了更多接口。spark sql 应用一些额外的信息来提升性能。 这里有一下几种方式来使用spark sql,包括sql, DataFrames API 和DataSets API。
SQL
Spark SQL可以用于sql查询, 包括正常的SQL或者HiveQL。 Spark SQL也可以用来从已安装的hive中读取数据。 使用编程语言运行SQL, 运行结果会返回一个DataFrame。 SQL可以通过命令行方式运行或者通过JDBC/ODBC
DataFrames
DataFrame基于列存储的分布式数据集合。 理论上等同于关系型数据库的一个table,或者R/Python中的一个数据框架, but with richer optimizations under the hood。 DataFrames 可以从多个数据源组中构建生成, 包括结构化数据文件,hive表,外部数据库以及已经存在的RDD。
Datasets
Datasets是在spark1.6中新增的实验性接口, 意在提高RDD的优势,充分利用Spark SQl 计算引擎的性能。 Dataset 可以从JVM objects 中创建,通过转化操作(如map, flatmap, filter等)来操作。
Getting Started
Starting Point: SQLContext
创建sqlContext
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)除了sqlContext之外,也可以创建hiveContext, 在sqlcontxt之上提供了更多的附加功能, 包括: 使用完整的HiveQL, 使用hive的UDF,从hive表中读取数据。 使用HiveContext, 无需安装hive, 只要数据源在SQLContext可用即可。 HiveContext 只是在spark构建的时候,打包了所有hive的依赖。如果这些依赖对你的应用来说不是一个问题,那推荐使用hiveContext。
SQL中的一些特定的变量可以通过spark.sql.dialect选项来设置。 在SQLContext中可以通过setConf来设置参数, 在SQL的命令行则可以通过set key=value 来设置。
Creating DataFrames
对于sqlContext, 应用可以从RDD, hive Table, 以及数据源来创建DataFrames
DataFrame Operations
Running SQL Queries Programmatically
sqlContext 可以运行sql查询,将结果作为DataFrame返回
Creating Datasets
Datasets 与RDD很相似,但不同于Java Serialization与 kryo的序列化方式,Datasets使用一个特殊的序列化方式Encoder 来在数据处理和传输的时候进行序列化。 尽管encoder与标准的序列化都是将object 转为字节, 但是encoder是编码动态生成以及使用的, 允许spark执行很多操作如过滤,排序,以及hash, 并且不允许反序列化为一个object。
DataFrame通过指定一个类来转化为DataSet
Interoperating with RDDs
spark sql 支持俩种方式将RDD转为DataFrames。 第一种方式是使用反射根据object中包含的类型来推断schema。 这种方式适用于你已经清楚的知道schema的情况下, 代码更简洁,更有效。
第二种方式是通过编程接口来构造一个schema,作用于已经存在的RDD上。 这种方式比较复杂,适用于列与类型都不清楚的情况下构造DataFrames。
Inferring the Schema Using Reflection
spark SQL提供的scala接口支持自动的将包含case class的RDD转为DataFrame。 case class定义了table的schema, case class的变量名被读取出来通过反射变为列明。 case classes可嵌套,或者包含复杂的数据类型,如Sequences或者Array。 RDD可以转为DataFrame,并注册成table。
Programmatically Specifying the Schema
当case class事先无法被定义的时候(类如: 整条记录的结构被编码为一个字符串,或者一个文本数据集被转码出来,不同的用户使用不同的字段的时候), 可以通过以下三部,创建DataFrame 1. 有原始的RDD创建行RDD 2. 根据第一步创建的RDD结构创建schema 结构类型 StructType 3. 通过sqlContext的createDataFrame方式 将schema 应用于行的RDD上
Data Sources
spark SQL 支持对 DataFrame接口加载的各种数据源进行操作。DataFrame 可以被当做一个普通的RDD来操作,也可以注册成为一个临时的表通过sql来访问数据。
Generic Load/Save Functions
Manually Specifying Options
通过一些额外的参数可以手动的指定数据源, 数据源必须比如使用全面(如: org.apache.spark.sql.parquet), 但是内建的数据源可以使用简称(如: json, parquet, jdbc), 任何类型的DataFrame都可以通过这种方式转为令一种类型
Run SQL on files directly
Save Modes
Saving to Persistent Tables
使用HiveContext时, 可以将DataFrame通过saveAsTable命令存储在持久化表中。 不同于registerTempTable, saveAsTable将会保存DataFrame的内容数据,并在HiveMetastore中创建一个指向数据的指针。 当spark程序重新启动后, 只要连接之前的metastore, 持久化表会一直存在。 通过sqlContext的table方法可以重新从持久化表数据中创建table。saveAsTable 默认创建一张可管理的表,也就是说,表数据的位置由metastore来管理。 当表删除后,metadata的数据也会自动删除。
Parquet Files
Parquet 是一种列存储数据结构, spark SQL对其提供了读写功能, 文件的schema保存在原始数据中。 当写Parquet文件时,所有的列都自动转为可为空的压缩格式
Loading Data Programmatically
Partition Discovery
表分区是一种通用的优化方式, 在分区表中,数据通常被存储在不同的目录中, 分区字段的值就是目录名。 Parquet数据源能够自动发现和推倒出分区信息。 例如: 我们以如下目录结构保存数据,gender和country为分区字段。
将路径参数path/to/table 传递给 SQLContext.read.parquet或者 SQLContext.read.load, SparkSQL会自动的从路径上扩展出分区信息。 DataFrame的schema如下:
分区字段的类型也是被自动推倒出来的。 字符串以及数值类型的分区字段都支持。 有时,用户并不希望自动推断分区字段类型, 可以通过 spark.sql.sources.partitionColumnTypeInference.enabled来设置, 默认值为true。 当设置为false后,分区字段为string类型
spark1.6开始,默认的spark只会在给定路径下寻找分区字段。如用户使用path/to/table/gender=male 路径给 SQLContext.read.parquet or SQLContext.read.load, gender并不会作为分区字段。 如果你需要指定根路径从哪儿开始, 可以在datasource 中使用basePath参数。 类如,当数据路径为path/to/table/gender=male, 且用户设置了basePath 为path/to/table/. gender还是分区字段
Schema Merging
与ProtocolBuffer, Avro, and Thrift类似, Parquet也支持schema演化。 用户可以先设置一个简单的schema, 然后逐渐的添加更多需要的字段。 慢慢的,用户会发现有很多的Parquet files,且schema不相同。 Parquet数据源会自动检测到这些年情况,并对这些文件schema进行合并
因为合并schema是一个比较费时的操作,所以从1.5.0开始,默认关闭了这个选项。 在下面情况下,你可以选择开启 1. 当读取Parquet fields 2. 在全局变量spark.sql.parquet.mergeSchema被设置为true时
Hive metastore Parquet table conversion
在读取或者写 Hive metastore Parquet tables时, spark SQL 为了更好的性能, 使用自己的parquet,放弃hive的SerDe。 可以通过spark.sql.hive.convertMetastoreParquet来控制,默认为打开。
Hive/Parquet Schema Reconciliation
Metadata Refreshing
Configuration
spark.sql.parquet.binaryAsString
spark.sql.parquet.int96AsTimestamp
spark.sql.parquet.cacheMetadata
spark.sql.parquet.compression.codec
spark.sql.parquet.filterPushdown
spark.sql.hive.convertMetastoreParquet
spark.sql.parquet.output.committer.class
spark.sql.parquet.mergeSchema
JSON Datasets
通过SQLContext.read.json() 加载一个字符串或者json文件,来生成一个DataFrame
Hive Tables
构造HiveContext, 继承自SQLContext
Interacting with Different Versions of Hive Metastore
spark.sql.hive.metastore.version
spark.sql.hive.metastore.jars
spark.sql.hive.metastore.sharedPrefixes
spark.sql.hive.metastore.barrierPrefixes
JDBC To Other Databases
spark SQL 也可以通过jdbc从其他的数据源中读取数据, 通过JdbcRDD来对数据进行操作。 返回DataFrame的结果,可以很容易在SparkSQL中处理,并且可以很容易的与其他数据源进行join操作。 JDBC datasource 也可以很容易的用java或者python来实现。
使用JDBC 需要现在spark的classpath中放入数据库对应的jdbc driver, 如,连接postgresql数据库,需要执行如下命令:
参数
参数名
意义
url
连接的jdbc url
dbtable
要读取的jdbc 数据表
driver
连接url时需要的jdbc driver名称
partitionColumn,lowerBound,upperBound,numPartitions
These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.
fetchSize
决定了每一轮查询返回的行数
Last updated
Was this helpful?