当前位置: 首页>后端>正文

大数据学习之(二十一)structured streaming 基于事件时间窗口操作

简介

根据事件时间进行滑动窗口计算。

  • 必须要TimeStamp列

  • 默认每次输出都包含所有窗口对应的结果值

  • 处理迟到的事件 watermarking(容忍or丢弃)

  • 窗口滑动时间必须小于等于窗口时间。

  • 窗口划分规则

    根据窗口时间和滑动时间来判断输入的事件时间该条数据属于哪个窗口不好判断,所以这里预估多了一些窗口,并在输出时过滤有效的窗口。

  • 输出模式

    • outputMode("complete") 维护所有数据状态,无论迟到的数据有没有小于watermark。

demo源码

package com.mashibing.stscode.scalacode.windows

import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable

/**
  *  实时读取scoket数据,对数据按照事件时间进行划分窗口统计wordcount
  *   1641780000000 zhangsan,lisi,maliu,zhangsan
  *   1641780002000 zhangsan,lisi,wangwu
  *   1641780005000 lisi,maliu,lisi
  *   1641780010000 zhangsan,lisi
  *   1641780003000 wangwu,zhangsan
  */
object WindowOnEventTime {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("WindowOnEventTime")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")
    import spark.implicits._

    val df: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()

    //处理数据,将数据中的时间列转换成时间戳类型: 1641780000000 zhangsan,lisi,maliu,zhangsan
    val tsAndWordsDF : DataFrame = df.as[String].flatMap(line=>{
      val ts: String = line.split(" ")(0)
      val arr: mutable.ArraySeq[(Timestamp, String)] = line.split(" ")(1).split(",").map(word => {
        (new Timestamp(ts.toLong), word)
      })
      arr
    }).toDF("timestamp","word")

    //4.使用window 必须导入以下functions函数
    import org.apache.spark.sql.functions._
    //设置窗口
       //将数据按照窗口和单词分组,对每个窗口内数据进行统计,统计之后的DataFrame 多一个window Struct 类型的字段,包含窗口起始时间
    val transDF: DataFrame = tsAndWordsDF.groupBy(window($"timestamp", "10 seconds", "3 seconds"),
      $"word"
    ).count()

    transDF.printSchema()
 //5.获取设置窗口后的数据
    val result: DataFrame = transDF.map(row => {
      val startTime: Timestamp = row.getStruct(0).getTimestamp(0)
      val endTime: Timestamp = row.getStruct(0).getTimestamp(1)
      val word: String = row.getString(1)
      val count: Long = row.getLong(2)

      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

      (sdf.format(startTime.getTime), sdf.format(endTime.getTime), word, count)
    }).toDF("start", "end", "word", "count")

    val query: StreamingQuery = result.orderBy("start","end").writeStream
      .format("console")
      .outputMode("complete")
      .start()

    query.awaitTermination()


  }

}
#node1
nc -l 9999
#依次输入如下数据
   参考时间              timestamp       words
2022-01-10 10:00:00    1641780000000 zhangsan,lisi,maliu,zhangsan
2022-01-10 10:00:02    1641780002000 zhangsan,lisi,wangwu
2022-01-10 10:00:05    1641780005000 lisi,maliu,lisi
2022-01-10 10:00:10    1641780010000 zhangsan,lisi
2022-01-10 10:00:03    1641780003000 wangwu,zhangsan

watermarking

WaterMarking机制主要解决了延迟数据是否聚合和减少内存聚合状态问题。

作用

  1. 系统可以删除过期的状态数据,用于释放内存。
  2. 晚到的事件数据是否丢弃。

计算公式

*Watermark时间(T) = 最后触发窗口内最大的事件时间(MaxTime) - 允许数据迟到的时间(LateTime)*

每个窗口都有start time和end time属性,当watermark T时间大于一个窗口的end time后,当前窗口的状态会被系统丢弃。

注意

  1. 引入watermark之后代码中outputMode只能设置update、append模式系统才会删除过期数据,设置complete不会删除过期数据。设置update更新模式时,如果watermark值没过时间窗口的end time之前,如果有迟到数据落入到该窗口,该窗口会重复触发。
  2. watermak在非流数据处理上没有任何作用。
  3. watermark 可以基于窗口,也可以基于 eventtime事件时间本身。

output mode

complete

  1. 某些操作必须在complete模式下使用,比如 order by,在ouput update模式下会报错。
  2. 所有状态保留,事件保留。

update

  1. 窗口存在重复触发的可能:迟到的数据落入该窗口
  2. 状态,事件 根据water marking 废弃

append

  1. watermark必须大于等于一个窗口结束时间,那么这个窗口数据才会被输出
  2. 状态,事件 根据water marking 废弃

窗口类型

滑动窗口

目前演示的,默认的

滚动窗口

固定时间大小,不重叠的,连续时间间隔的窗口,也就是步长等于窗口长度的滑动窗口。

会话窗口

  1. 大小是动态不固定的,
  2. 会话窗口从输入开始如果在间隔时间内收到后续输入,则窗口长度自动扩展,如果在指定的间隔时间内没有接收到数据,则会话窗口自动关闭。
  3. 会话窗口中如果groupBy中有对应的列,是根据列字段划分窗口。
    比如demo中根据groupby时的 word,每个word 一个窗口。
  4. 可以根据设置的列条件****动态设置间隔时间。

join

StructuredStreaming结构化流支持与静态Dataset/DataFrame进行join,也支持和流式的Dataset/DataFrame进行join。

流与静态数据join

package com.mashibing.stscode.scalacode.jointest

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  *  Structured Streaming 流与静态数据join 关联
  */
object StreamAndStaticJoin {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("StreamAndStaticJoin")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")
    import spark.implicits._

    //创建 批量的DataFrame
    val list = List[String](
      "{\"aid\":1,\"name\":\"zs\",\"age\":18}",
      "{\"aid\":2,\"name\":\"ls\",\"age\":19}",
      "{\"aid\":3,\"name\":\"ww\",\"age\":20}",
      "{\"aid\":4,\"name\":\"ml\",\"age\":21}"
    )
    val personInfo: DataFrame = spark.read.json(list.toDS())

    //创建流式数据
    /**
      * 1,zs,100
      * 2,ls,200
      * 3,ww,300
      * 5,tq,500
      */
    val scoreInfo: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1")
      .option("port", 9999)
      .load()
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0).toInt, arr(1), arr(2).toInt)
      }).toDF("bid", "name", "score")

    //将流数据与静态数据进行关联
    val result: DataFrame = scoreInfo.join(personInfo,scoreInfo.col("bid") === personInfo.col("aid"),"left_semi")

    result.printSchema()

    result.writeStream
      .format("console")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start()
      .awaitTermination()

  }

}

输出如下,注意 join类型

-------------------------------------------
Batch: 1
-------------------------------------------
+---+----+-----+
|bid|name|score|
+---+----+-----+
|  1|  zs|  100|
|  2|  ls|  200|
|  3|  ww|  300|
+---+----+-----+

关联类型支持

https://spark.apache.org/docs/3.2.0/structured-streaming-programming-guide.html#types-of-time-windows

大数据学习之(二十一)structured streaming 基于事件时间窗口操作,第1张
structured_stream_join_1.png

流-流Joins

一个流接收到的一条数据对于另外一条流有可能在任何时刻有对应数据与之匹配,所以我们对两条流数据进行缓存。

  1. 两条流的实时数据有可能有延迟数据,所以我们需要设置watermark机制自动处理迟到数据
  2. 对于缓存的数据也不能无限增大保存,所以需要设置Time Constraint 时间约束来删除缓存数据,使得过于旧的输入数据无法与将来的输入数据进行匹配。
package com.mashibing.stscode.scalacode.jointest

import java.sql.Timestamp

import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryListener}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  *  流和流join
  */
object StreamAndStreamJoin {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName("StreamAndStaticJoin")
      .master("local")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    spark.sparkContext.setLogLevel("Error")
    import spark.implicits._

    //设置第一个流
    // xxx,1,zs,18
    val df1: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node5")
      .option("port", 9998)
      .load()
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (new Timestamp(arr(0).toLong), arr(1).toInt, arr(2),arr(3).toInt)
      }).toDF("ats","aid", "aname", "age")
      .withWatermark("ats","3 seconds")

    //设置第二个流
    val df2: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node5")
      .option("port", 9999)
      .load()
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (new Timestamp(arr(0).toLong), arr(1).toInt, arr(2),arr(3).toInt)
      }).toDF("bts","bid", "bname", "score")
      .withWatermark("bts","5 seconds")


    //两个流进行关联
    import org.apache.spark.sql.functions._
    val result = df1.join(df2,expr(
      """
        | aid = bid and
        | bts >= ats and
        | bts <= ats + interval 10 seconds
      """.stripMargin
    ),"leftOuter")


    val query1: StreamingQuery = df1.writeStream
      .format("console")
      .queryName("query1")
      .start()

    val query2: StreamingQuery = df2.writeStream
      .format("console")
      .queryName("query2")
      .start()

    val query3: StreamingQuery = result.writeStream
      .format("console")
      .queryName("query3")
      .start()

    spark.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
      }
      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
      }
      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {


        if("query1".equals(queryProgress.progress.name)){
          println("query1 watermark : "+queryProgress.progress.eventTime.get("watermark"))
        }else if("query2".equals(queryProgress.progress.name)){
          println("query2 watermark : "+queryProgress.progress.eventTime.get("watermark"))
        }else{
          println("query3 watermark : "+queryProgress.progress.eventTime.get("watermark"))
        }
      }
    })

   spark.streams.awaitAnyTermination()
  }

}

注意点

对于批数据DataFrame/Dataset的一些操作,目前在实时数据流StructuredStreaming有一些Operator不被支持和一些注意点,如下:
(所以目前还比不上flink)

  1. Streaming Dataset不支持多个流group by 聚合操作

  2. Streaming Dataset不支持limit和获取topN操作

  3. Stremaing Dataset不支持Distinct操作

  4. Streaming Dataset 如果聚合后不支持Deduplication 流去重

  5. Streaming Dataset 在聚合后并且是complete输出模式才支持order by 排序

  6. Stream和Static进行join操作时,Right Outer、Full Outer不被支持。

  7. Dataset count操作在Streaming Dataset中使用ds.groupBy(col).count()代替。

  8. Dataset foreach() 操作在Streaming Dataset中使用ds.writeStream.foreach(...)代替。

  9. Dataset show()操作在Stream Dataset中使用ds.writeStrea.format("console")代替。


https://www.xamrdz.com/backend/3at1933294.html

相关文章: