可以从各种结构化数据源中读取数据 JSON Hive等
不仅支持在spark内使用SQL语句进行数据查询,也支持从类似商业软件中通过标准数据库连接器连接spark SQL进行查询
在spark内部使用spark SQL时,支持SQL与常规的python java scala代码整合
spark SQL 提供一种特殊的RDD, schemaRDD,存放Row对象,每个Row对象代表一行记录,在内部可以利用结构信息更高效的存储数据,还支持运行SQL查询,可以从外部数据源创建,也可以从查询结果或普通RDD创建(新版本中schemaRDD已被DataFrame代替);
Hive 是hadoop上的SQL引擎,spark SQL 编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL 可以支持Hive 表访问、UDF、SERDE、以及HIve查询语言。如果要在spark SQL中包含Hive库,不需要事先安装Hive,最好在编译spark SQL时引入Hive支持。
根据是否使用Hive,有两个不同的入口,HiveContext可以提供HiveQL以及其他依赖于Hive功能的支持。SQLContext则支持spark SQL功能的一个子集,去掉了Hive的支持。
#python
from pyspark.sql import HiveContext,Row
from pyspark.sql import SQLContext, Row
sc= SparkContext();
Hivesc= SQLContext();
#java
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.Row;
JavaSparkContext sc = new JavaSparkContext();
SQLContext sqlctx = new SQLContext(sc);
要在一张数据表上进行查询,需要调用HiveContext或SQLContext中的sql方法,首先要告诉SQL要查询的数据时什么,需要先从json文件中读取数据,并把这些数据注册为一张临时表
#python
tweets= hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
results=hiveCtx.sql("SELECT user.name, text FROM tweets")
#java
SchemaRDD tweets = hiveCtx.jsonFile(jsonFile);
tweets.registetrTempTable("tweets');
SchemaRDD results = hiveCtx.sql("SELECT user.name,text FROM tweets");
SchemaRDD与传统数据库中的表的概念类似,是由Row对象组成的RDD Row只是对基本数据类型的数组的封装,可以对其应用已有的RDD操作,同时可以将任意的schemaRDD注册为临时表进行查询
Row对象时schemaRDD中的记录,本质就是一个数组,可以通过get方法读入一个列序号返回一个obejct类型
#python
toptweettext = toptweets.map(lambda row: row.getString(0))//row.text
#java
JavaRDD<String> toptweettext = toptweets.toJavaRDD().map(new Function <Row,String>(){
public String call(Row row){
return row.getString(0);
}
})
sparkSQL的缓存机制与spark中略有不同,由于我们知道每个列的类型信息,所以spark可以更高效的存储数据,使用专门的hivectx.cacheTable("tablename"),当存储数据表时,sql使用一种列式存储格式在内存中表示数据。
也可以使用SQL语句缓存表 CACHE TABLE tablename 或 UNCACHE TABLE tablename
sparkSQL支持多种结构化数据源,从中读取到row对象,包括Hive表,JSON和parquet文件。也可以在程序中通过指定结构信息将常规的RDD转化为schemaRDD,使得SQL更简单
Hive
spark SQL连接到已部署号的Hive需要提供Hive配置,将hive-site.xml文件复制到spark的conf目录下,如果没有,spark sql会使用本地的Hive元数据仓
JSON
当json文件中的记录遵循同样结构信息,那么sparksql可以通过扫描文集推测出结构信息,并且可以使用名字访问对应字段 jsonfile
基于RDD
#python
haRDD=sc.parrallelize([Row(name="holden",favourite="coffee")])
haschemaRDD=hiveCtx.inferSchema(haRDD)
haschemaRDD.registerTempTable("happypeople')
#java
hiveCtx.applySchema(RDD,class)
用户自定义函数UDF
可以使用spark 支持的编程语言编写好函数,然后通过spark sql内建的方法传递进来。在scala和python中可以利用语言原生的函数和lambda语法的支持,在java中需要扩展对应的UDF类
# python
hivectx.registerFunction("strlen",lambda x :len(x),IntegerType())
lengthschemaRDD = hivectx.sql("SELECT strlen('text') FROM tweets LIMIT 10")
#JAVA
import org.apache.spark.sql.api.java.UDF1;
hivectx.udf().register("strlen", new UDF1<String, Integer>(){
@Override
public Integer call (String s) throws Exception{
return s.length();}}, DataTypes.IntegerType);
schemaRDD tweetlength=hivectx.sql("SELECT strlen('text') FROM tweets LIMIT 10");