当前位置: 首页>后端>正文

大数据 5-Spark-SQL篇

基础概念

  • DataSet
    分布式数据集合,一种强类型的数据结构,理解为spark的强类型数据集合,用于分布式计算和传输。代码中有很详细的注释。
package org.apache.spark.sql
...
@Stable
class Dataset[T] private[sql](
    @DeveloperApi @Unstable @transient val queryExecution: QueryExecution,
    @DeveloperApi @Unstable @transient val encoder: Encoder[T])
  extends Serializable {

  @transient lazy val sparkSession: SparkSession = {
    if (queryExecution == null || queryExecution.sparkSession == null) {
      throw new SparkException(
      "Dataset transformations and actions can only be invoked by the driver, not inside of" +
        " other Dataset transformations; for example, dataset1.map(x => dataset2.values.count()" +
        " * x) is invalid because the values transformation and count action cannot be " +
        "performed inside of the dataset1.map transformation. For more information," +
        " see SPARK-28702.")
    }
    queryExecution.sparkSession
  }
  • DataFrame
    数据帧,可以认为是一种特殊的DataSet类型为Dataset[Row]
package org.apache.spark

import org.apache.spark.annotation.{DeveloperApi, Unstable}
import org.apache.spark.sql.execution.SparkStrategy

/**
 * Allows the execution of relational queries, including those expressed in SQL using Spark.
 *
 *  @groupname dataType Data types
 *  @groupdesc Spark SQL data types.
 *  @groupprio dataType -3
 *  @groupname field Field
 *  @groupprio field -2
 *  @groupname row Row
 *  @groupprio row -1
 */
package object sql {

  @DeveloperApi
  @Unstable
  type Strategy = SparkStrategy
  type DataFrame = Dataset[Row]
  private[sql] val SPARK_VERSION_METADATA_KEY = "org.apache.spark.version"
  private[sql] val SPARK_LEGACY_DATETIME = "org.apache.spark.legacyDateTime"
}

RDD、DataSet、DataFrame

  1. 互相转换


    大数据 5-Spark-SQL篇,第1张
    转换图
val ds = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).map(t=>User(t._1, t._2)).toDS
val rdd = rdd_ds.rdd
val df = sc.makeRDD(List(("zhangsan",30), ("lisi",49))).toDF("name","age")
val ds = df.as[User]
  1. 相同特点
  • 分布式弹性数据集,为处理超大型数据提供便利;
  • 惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
  • 有许多共同的函数,如filter,排序等;
  • 在对DataFrame和Dataset进行操作许多操作都需引入import spark.implicits._
  • 都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
  • 都有partition的概念
  • DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型
  1. 区别对比
  • RDD
    RDD一般和spark mllib同时使用,RDD不支持spark-sql操作。
  • DataFrame
    DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值。
    DataFrame与DataSet一般不与 spark mllib 同时使用。
    支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行 sql 语句操作。
    支持一些特别方便的保存方式,比如保存成csv等。
  • DataSet
    Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。
    Dataset中,每一行是什么类型是不一定的,可以是定义的任意Pojo类型。

常用编程

  1. 创建执行环境
    val sparkConf: SparkConf =
          new SparkConf().setAppName("SparkSQL")

    val spark: SparkSession =
          SparkSession.builder()
            .config(sparkConf)
            .getOrCreate()

    var sc = spark.sparkContext
    import spark.implicits._

//启用hive支持
    val spark: SparkSession =
          SparkSession.builder()
            .config(sparkConf)
            .enableHiveSupport()  // 启用hive的支持
            .getOrCreate()
  1. 读取数据
//json
 val df: DataFrame = 
            spark
              .read
              .format("json")
              .load("/user/data/1.json")

//parquet
val dfParquet: DataFrame =    
       spark
        .read
        .parquet("/user/data/1.parquet")

//mysql
    val dfMysql: DataFrame = 
    spark
      .read
      .format("jdbc")
      .option("url", "jdbc:mysql://hdp1:3306/test")
      .option("user", "root")
      .option("password", "qwer1234")
      .option("dbtable", "user")
      .load()
  1. spark 操作 hive

val df =  spark.sql("select * from default.test ")

    df.write
       .mode(SaveMode.Overwrite)
       .saveAsTable("default.test")

    df.write.mode(SaveMode.Append).format("parquet").partitionBy("dt").saveAsTable("test")

df.createOrReplaceTempView("test_temp")
spark.sql("insert overwrite table default.test parittion(dt)  select * from test_temp")
  1. UDF函数
 spark
   .udf
   .register("prefixName", 
                 (name:String) => "prefix : " + name  
   )

spark.sql("select prefixName( name ) from user")
  1. UDAF
  • Aggregator 抽象类
package org.apache.spark.sql.expressions

import org.apache.spark.sql.{Encoder, TypedColumn}
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete}
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression

/**
 * A base class for user-defined aggregations, which can be used in `Dataset` operations to take
 * all of the elements of a group and reduce them to a single value.
 *
 * For example, the following aggregator extracts an `int` from a specific class and adds them up:
 * {{{
 *   case class Data(i: Int)
 *
 *   val customSummer =  new Aggregator[Data, Int, Int] {
 *     def zero: Int = 0
 *     def reduce(b: Int, a: Data): Int = b + a.i
 *     def merge(b1: Int, b2: Int): Int = b1 + b2
 *     def finish(r: Int): Int = r
 *   }.toColumn()
 *
 *   val ds: Dataset[Data] = ...
 *   val aggregated = ds.select(customSummer)
 * }}}
 *
 * Based loosely on Aggregator from Algebird: https://github.com/twitter/algebird
 *
 * @tparam IN The input type for the aggregation.
 * @tparam BUF The type of the intermediate value of the reduction.
 * @tparam OUT The type of the final output result.
 * @since 1.6.0
 */
abstract class Aggregator[-IN, BUF, OUT] extends Serializable {

  /**
   * A zero value for this aggregation. Should satisfy the property that any b + zero = b.
   * @since 1.6.0
   */
  def zero: BUF

  /**
   * Combine two values to produce a new value.  For performance, the function may modify `b` and
   * return it instead of constructing new object for b.
   * @since 1.6.0
   */
  def reduce(b: BUF, a: IN): BUF

  /**
   * Merge two intermediate values.
   * @since 1.6.0
   */
  def merge(b1: BUF, b2: BUF): BUF

  /**
   * Transform the output of the reduction.
   * @since 1.6.0
   */
  def finish(reduction: BUF): OUT

  /**
   * Specifies the `Encoder` for the intermediate value type.
   * @since 2.0.0
   */
  def bufferEncoder: Encoder[BUF]

  /**
   * Specifies the `Encoder` for the final output value type.
   * @since 2.0.0
   */
  def outputEncoder: Encoder[OUT]

  /**
   * Returns this `Aggregator` as a `TypedColumn` that can be used in `Dataset`.
   * operations.
   * @since 1.6.0
   */
  def toColumn: TypedColumn[IN, OUT] = {
    implicit val bEncoder = bufferEncoder
    implicit val cEncoder = outputEncoder

    val expr =
      AggregateExpression(
        TypedAggregateExpression(this),
        Complete,
        isDistinct = false)

    new TypedColumn[IN, OUT](expr, encoderFor[OUT])
  }
}

  • 示例函数
  case class AgeBuffer ( var totalAge : Long  , var totalCount :Long )
  /**
    *  自定义UDAF 计算平均年龄
    *
    * 1. 继承 Aggregator
    * 2. 指定泛型
    *     IN:  Long
    *     BUFF: 缓冲区  AgeBuffer
    *     OUT:  Double
    */
  class AGE_Avg_Aggregator extends Aggregator[Long,AgeBuffer,Double] {
    //初始化缓冲区
    override def zero: AgeBuffer = AgeBuffer(0L,0L)

    //将传入的数据缓冲到buffer中 age表示数据
    override def reduce(buffer: AgeBuffer, age: Long): AgeBuffer = {
      buffer.totalAge += age
      buffer.totalCount += 1
      buffer
    }

    override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
      b1.totalAge += b2.totalAge
      b1.totalCount += b2.totalCount
      b1
    }

    override def finish(buffer: AgeBuffer): Double = {
      buffer.totalAge / buffer.totalCount.toDouble
    }
    // 自定义的类型: Encoders.product
    override def bufferEncoder: Encoder[AgeBuffer] = Encoders.product

    override def outputEncoder: Encoder[Double] =Encoders.scalaDouble
  }
}

maven 依赖

     <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_{scala-version}</artifactId>
            <version>{spark-version}</version>
     </dependency>

     <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_{scala-version}</artifactId>
            <version>{spark-version}</version>
     </dependency>

      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_{scala-version}</artifactId>
            <version>{spark-version}</version>
    </dependency>

常见优化

  • Shuffle 分区数目
    在SparkSQL中当Job产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions)为200,在实际项目中要合理设置。

  • bypass机制触发
    shuffle map task数量小于 spark.shuffle.sort.bypassMergeThreshold=200 参数的值。
    不是map combine聚合的shuffle算子(比如reduceByKey有map combie)。

  • JOIN 优化
参考:https://www.cnblogs.com/0xcafedaddy/p/7614299.html
1.Broadcast Hash Join:适合一张很小的表和一张大表进行Join
2.Shuffle Hash Join:适合一张小表(比上一个大一点)和一张大表进行Join
3.Sort Merge Join:适合两张大表进行Join
  • 减少小文件
--参考:https://blog.csdn.net/wangwangstone/article/details/128849898
spark.sql(""""
    insert into tableNew ..partition(...).
    select ...from tableOld 
    distribute by columnName如dt  cast(rand() * 3 as int)

-- distribute by cast( rand * N as int) 这里的N是指具体最后落地生成多少个文件数

-- 修改写入时的分区数量
df_data_out.coalesce(3).write.orc(path = file_path, mode = 'overwrite')
df_data_out.repartition(3).write.orc(path = file_path, mode = 'overwrite')
""")
  • 合理使用持久化和缓存

https://www.xamrdz.com/backend/3o31937004.html

相关文章: