基础概念
package org.apache.spark.sql
...
@Stable
class Dataset[T] private[sql](
@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
@DeveloperApi @Unstable @transient val encoder: Encoder[T])
extends Serializable {
@transient lazy val sparkSession: SparkSession = {
if (queryExecution == null || queryExecution.sparkSession == null) {
throw new SparkException(
"Dataset transformations and actions can only be invoked by the driver, not inside of" +
" other Dataset transformations; for example, dataset1.map(x => dataset2.values.count()" +
" * x) is invalid because the values transformation and count action cannot be " +
"performed inside of the dataset1.map transformation. For more information," +
" see SPARK-28702.")
}
queryExecution.sparkSession
}
- DataFrame
数据帧,可以认为是一种特殊的DataSet类型为Dataset[Row]
。
package org.apache.spark
import org.apache.spark.annotation.{DeveloperApi, Unstable}
import org.apache.spark.sql.execution.SparkStrategy
/**
* Allows the execution of relational queries, including those expressed in SQL using Spark.
*
* @groupname dataType Data types
* @groupdesc Spark SQL data types.
* @groupprio dataType -3
* @groupname field Field
* @groupprio field -2
* @groupname row Row
* @groupprio row -1
*/
package object sql {
@DeveloperApi
@Unstable
type Strategy = SparkStrategy
type DataFrame = Dataset[Row]
private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDateTime"
}
RDD、DataSet、DataFrame
-
互相转换
val ds = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
val rdd = rdd_ds.rdd
val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
val ds = df.as[User]
- 相同特点
- 分布式弹性数据集,为处理超大型数据提供便利;
- 惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
- 有许多共同的函数,如filter,排序等;
- 在对DataFrame和Dataset进行操作许多操作都需引入
import spark.implicits._
- 都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
- 都有partition的概念
- DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型
- 区别对比
- RDD
RDD一般和spark mllib同时使用,RDD不支持spark-sql操作。 - DataFrame
DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值。
DataFrame与DataSet一般不与 spark mllib 同时使用。
支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行 sql 语句操作。
支持一些特别方便的保存方式,比如保存成csv等。 - DataSet
Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
Dataset中,每一行是什么类型是不一定的,可以是定义的任意Pojo类型。
常用编程
- 创建执行环境
val sparkConf: SparkConf =
new SparkConf().setAppName("SparkSQL")
val spark: SparkSession =
SparkSession.builder()
.config(sparkConf)
.getOrCreate()
var sc = spark.sparkContext
import spark.implicits._
//启用hive支持
val spark: SparkSession =
SparkSession.builder()
.config(sparkConf)
.enableHiveSupport() // 启用hive的支持
.getOrCreate()
- 读取数据
//json
val df: DataFrame =
spark
.read
.format("json")
.load("/user/data/1.json")
//parquet
val dfParquet: DataFrame =
spark
.read
.parquet("/user/data/1.parquet")
//mysql
val dfMysql: DataFrame =
spark
.read
.format("jdbc")
.option("url", "jdbc:mysql://hdp1:3306/test")
.option("user", "root")
.option("password", "qwer1234")
.option("dbtable", "user")
.load()
- spark 操作 hive
val df = spark.sql("select * from default.test ")
df.write
.mode(SaveMode.Overwrite)
.saveAsTable("default.test")
df.write.mode(SaveMode.Append).format("parquet").partitionBy("dt").saveAsTable("test")
df.createOrReplaceTempView("test_temp")
spark.sql("insert overwrite table default.test parittion(dt) select * from test_temp")
- UDF函数
spark
.udf
.register("prefixName",
(name:String) => "prefix : " + name
)
spark.sql("select prefixName( name ) from user")
- UDAF
- Aggregator 抽象类
package org.apache.spark.sql.expressions
import org.apache.spark.sql.{Encoder, TypedColumn}
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete}
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
/**
* A base class for user-defined aggregations, which can be used in `Dataset` operations to take
* all of the elements of a group and reduce them to a single value.
*
* For example, the following aggregator extracts an `int` from a specific class and adds them up:
* {{{
* case class Data(i: Int)
*
* val customSummer = new Aggregator[Data, Int, Int] {
* def zero: Int = 0
* def reduce(b: Int, a: Data): Int = b + a.i
* def merge(b1: Int, b2: Int): Int = b1 + b2
* def finish(r: Int): Int = r
* }.toColumn()
*
* val ds: Dataset[Data] = ...
* val aggregated = ds.select(customSummer)
* }}}
*
* Based loosely on Aggregator from Algebird: https://github.com/twitter/algebird
*
* @tparam IN The input type for the aggregation.
* @tparam BUF The type of the intermediate value of the reduction.
* @tparam OUT The type of the final output result.
* @since 1.6.0
*/
abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
/**
* A zero value for this aggregation. Should satisfy the property that any b + zero = b.
* @since 1.6.0
*/
def zero: BUF
/**
* Combine two values to produce a new value. For performance, the function may modify `b` and
* return it instead of constructing new object for b.
* @since 1.6.0
*/
def reduce(b: BUF, a: IN): BUF
/**
* Merge two intermediate values.
* @since 1.6.0
*/
def merge(b1: BUF, b2: BUF): BUF
/**
* Transform the output of the reduction.
* @since 1.6.0
*/
def finish(reduction: BUF): OUT
/**
* Specifies the `Encoder` for the intermediate value type.
* @since 2.0.0
*/
def bufferEncoder: Encoder[BUF]
/**
* Specifies the `Encoder` for the final output value type.
* @since 2.0.0
*/
def outputEncoder: Encoder[OUT]
/**
* Returns this `Aggregator` as a `TypedColumn` that can be used in `Dataset`.
* operations.
* @since 1.6.0
*/
def toColumn: TypedColumn[IN, OUT] = {
implicit val bEncoder = bufferEncoder
implicit val cEncoder = outputEncoder
val expr =
AggregateExpression(
TypedAggregateExpression(this),
Complete,
isDistinct = false)
new TypedColumn[IN, OUT](expr, encoderFor[OUT])
}
}
- 示例函数
case class AgeBuffer ( var totalAge : Long , var totalCount :Long )
/**
* 自定义UDAF 计算平均年龄
*
* 1. 继承 Aggregator
* 2. 指定泛型
* IN: Long
* BUFF: 缓冲区 AgeBuffer
* OUT: Double
*/
class AGE_Avg_Aggregator extends Aggregator[Long,AgeBuffer,Double] {
//初始化缓冲区
override def zero: AgeBuffer = AgeBuffer(0L,0L)
//将传入的数据缓冲到buffer中 age表示数据
override def reduce(buffer: AgeBuffer, age: Long): AgeBuffer = {
buffer.totalAge += age
buffer.totalCount += 1
buffer
}
override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
b1.totalAge += b2.totalAge
b1.totalCount += b2.totalCount
b1
}
override def finish(buffer: AgeBuffer): Double = {
buffer.totalAge / buffer.totalCount.toDouble
}
// 自定义的类型: Encoders.product
override def bufferEncoder: Encoder[AgeBuffer] = Encoders.product
override def outputEncoder: Encoder[Double] =Encoders.scalaDouble
}
}
maven 依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_{scala-version}</artifactId>
<version>{spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_{scala-version}</artifactId>
<version>{spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_{scala-version}</artifactId>
<version>{spark-version}</version>
</dependency>
常见优化
Shuffle 分区数目
在SparkSQL中当Job产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions)为200,在实际项目中要合理设置。bypass机制触发
shuffle map task数量小于 spark.shuffle.sort.bypassMergeThreshold=200 参数的值。
不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。
- JOIN 优化
参考:https://www.cnblogs.com/0xcafedaddy/p/7614299.html
1.Broadcast Hash Join:适合一张很小的表和一张大表进行Join
2.Shuffle Hash Join:适合一张小表(比上一个大一点)和一张大表进行Join
3.Sort Merge Join:适合两张大表进行Join
- 减少小文件
--参考:https://blog.csdn.net/wangwangstone/article/details/128849898
spark.sql(""""
insert into tableNew ..partition(...).
select ...from tableOld
distribute by columnName如dt cast(rand() * 3 as int)
-- distribute by cast( rand * N as int) 这里的N是指具体最后落地生成多少个文件数
-- 修改写入时的分区数量
df_data_out.coalesce(3).write.orc(path = file_path, mode = 'overwrite')
df_data_out.repartition(3).write.orc(path = file_path, mode = 'overwrite')
""")
- 合理使用持久化和缓存