当前位置: 首页>编程语言>正文

flink storm 管道 flink pv

1.知识点

  • scala样例类
  • 计算pv时的key的处理方式。

        1)并行处理  (解决数据倾斜问题)

先map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key

              再以窗口结束时间做key分组求和            

        2) 并行度为1时,结合timeWindowAll如何处理(容易数据倾斜)

  • MapFunction的的使用
  • AggregateFunction预聚合函数的使用
  • WindowFunction的使用
  • KeyedProcessFunction的使用

2.业务目标

3.流程心法

4.模块详解

4.1 创建输入输出样例类

4.2 主object实现

4.2.1 创建执行环境并添加数据源

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(6)

    // 从文件中读取数据,获取resources中的文件,相对路径
    val resource = getClass.getResource("/UserBehavior.csv")
    print(resource.getPath)
    val inputStream: DataStream[String] = env.readTextFile(resource.getPath)

4.2.2 Datastream map转换为输入样例类

// 转换成样例类类型并提取时间戳和watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
      })   //No implicits found for parameter evidence:,需要引入 createTypeInformation,或者直接引入import org.apache.flink.streaming.api.scala._
      .assignAscendingTimestamps(_.timestamp * 1000L)

4.2.3 处理逻辑(1)----filter类型,自定义mapFunction

val pvStream  = dataStream
      .filter(_.behavior == "pv")
//      .map( data => ("pv",1L))  //第一种方式:定义一个Pv字符串作为分组的哑key,所有数据被分到同一个组,跟timeWindowAll操作,相当于分发到同一个组里去,并行度就无意义了。同样如果有同一个商品是个热点数据,那么大量的同一个商品都会被分到同一个分区中,从而引发数据倾斜。那么如何规避数据倾斜呢?
      .map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key
      .keyBy(_._1)
      .timeWindow(Time.hours(1)) //1小时滚动窗口
      .aggregate(new PvCountAgg(),new PvCountWindowResult()) //报错原因,定义的输入是map中的data是(String,Long),但是"pv",1中的1是Int类型,把1改成1L即可

自定义Mapfunction

class MyMapper() extends MapFunction[UserBehavior,(String,Long)]{
  override def map(value: UserBehavior): (String, Long) = {
    ( Random.nextString(10), 1L )
  }
}

预聚合函数

// 自定义预聚合函数
class PvCountAgg() extends AggregateFunction[(String,Long),Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

  自定义窗口函数

//自定义窗口函数
class PvCountWindowResult extends WindowFunction[Long,PvCount,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
    out.collect(PvCount(window.getEnd,input.head))
  }
}

4.2.4 处理逻辑(2)---以窗口结束时间分组计算PV之和

自定义mapper之后,每个key没hash打散到不同分组,上一步输出的是每个组里的每个key的pv,那么如果要计算当前窗口的总pv,就要按照winEnd来重新keyBy
val totalPvStream =  pvStream
      .keyBy(_.windowEnd)
//     .sum("count" )   //此操作是当前来一条数据就加1,所以会滚动输出,但是我们要的结果是当前窗口收集齐之后再输出,如何来做?
                            // 自定义一个keyedProcessFunction自行处理吧,如何来判断数据都收集齐了呢。用一个定时器
     .process(new TotalPvCountResult())

    totalPvStream.print()
KeyedProcessFunction实现:

   每次求和放在一个ValueState中,每来一条数据就状态value+count值。那么什么时候判断收集完了呢。定义个计时器延迟1ms触发,触发后清空状态

class TotalPvCountResult extends KeyedProcessFunction[Long,PvCount,PvCount]{

  //求和就要把每次求和放在状态变量里,如何判断数据都到齐了,增量聚合的话定义一个状态

  lazy val totalPvCountResultState :ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pv", classOf[Long]))

  override def processElement(i: PvCount, context: KeyedProcessFunction[Long, PvCount, PvCount]#Context, collector: Collector[PvCount]): Unit = {
    // 每来一个数据,将count值叠加在当前的状态上
    val currentTotalCount = totalPvCountResultState.value()
    totalPvCountResultState.update(currentTotalCount + i.count)  //上次输出的PVCOUNT中的count,加上当前状态中的值,再更新当前状态

    //注册定时器1ms后触发
    context.timerService().registerEventTimeTimer(i.windowEnd + 1)
//    collector.collect( )

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
    val totalPvCount = totalPvCountResultState.value()
    out.collect(PvCount(ctx.getCurrentKey, totalPvCount))  //获取ctx.getCurrentKey 当前key,和当前的状态值并且输出
    totalPvCountResultState.clear() //发送之后清空状态

  }
}

4.3 完整代码

package com.iqyi.bi.networkflow_analysis

import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._

import scala.util.Random



// 定义输入数据样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
// 定义输出pv统计的样例类
case class PvCount(windowEnd: Long, count: Long)




object PageView {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(6)

    // 从文件中读取数据,获取resources中的文件,相对路径
    val resource = getClass.getResource("/UserBehavior.csv")
    print(resource.getPath)
    val inputStream: DataStream[String] = env.readTextFile(resource.getPath)

    // 转换成样例类类型并提取时间戳和watermark
    val dataStream: DataStream[UserBehavior] = inputStream
      .map(data => {
        val arr = data.split(",")
        UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
      })   //No implicits found for parameter evidence:,需要引入 createTypeInformation,或者直接引入import org.apache.flink.streaming.api.scala._
      .assignAscendingTimestamps(_.timestamp * 1000L)

    val pvStream  = dataStream
      .filter(_.behavior == "pv")
//      .map( data => ("pv",1L))  //第一种方式:定义一个Pv字符串作为分组的哑key,所有数据被分到同一个组,跟timeWindowAll操作,相当于分发到同一个组里去,并行度就无意义了。同样如果有同一个商品是个热点数据,那么大量的同一个商品都会被分到同一个分区中,从而引发数据倾斜。那么如何规避数据倾斜呢?
      .map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key
      .keyBy(_._1)
      .timeWindow(Time.hours(1)) //1小时滚动窗口
      .aggregate(new PvCountAgg(),new PvCountWindowResult()) //报错原因,定义的输入是map中的data是(String,Long),但是"pv",1中的1是Int类型,把1改成1L即可


    //自定义mapper之后,每个key没hash打散到不同分组,上一步输出的是每个组里的每个key的pv,那么如果要计算当前窗口的总pv,就要按照winEnd来重新keyBy

   val totalPvStream =  pvStream
      .keyBy(_.windowEnd)
//     .sum("count" )   //此操作是当前来一条数据就加1,所以会滚动输出,但是我们要的结果是当前窗口收集齐之后再输出,如何来做?
                            // 自定义一个keyedProcessFunction自行处理吧,如何来判断数据都收集齐了呢。用一个定时器
     .process(new TotalPvCountResult())

    totalPvStream.print()

    env.execute("pv stream")
  }




}

// 自定义预聚合函数
class PvCountAgg() extends AggregateFunction[(String,Long),Long,Long]{
  override def createAccumulator(): Long = 0L

  override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1

  override def getResult(accumulator: Long): Long = accumulator

  override def merge(a: Long, b: Long): Long = a + b
}

//自定义窗口函数
class PvCountWindowResult extends WindowFunction[Long,PvCount,String,TimeWindow]{
  override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
    out.collect(PvCount(window.getEnd,input.head))
  }
}


class TotalPvCountResult extends KeyedProcessFunction[Long,PvCount,PvCount]{

  //求和就要把每次求和放在状态变量里,如何判断数据都到齐了,增量聚合的话定义一个状态

  lazy val totalPvCountResultState :ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pv", classOf[Long]))

  override def processElement(i: PvCount, context: KeyedProcessFunction[Long, PvCount, PvCount]#Context, collector: Collector[PvCount]): Unit = {
    // 每来一个数据,将count值叠加在当前的状态上
    val currentTotalCount = totalPvCountResultState.value()
    totalPvCountResultState.update(currentTotalCount + i.count)  //上次输出的PVCOUNT中的count,加上当前状态中的值,再更新当前状态

    //注册定时器1ms后触发
    context.timerService().registerEventTimeTimer(i.windowEnd + 1)
//    collector.collect( )

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
    val totalPvCount = totalPvCountResultState.value()
    out.collect(PvCount(ctx.getCurrentKey, totalPvCount))  //获取ctx.getCurrentKey 当前key,和当前的状态值并且输出
    totalPvCountResultState.clear() //发送之后清空状态

  }
}


class MyMapper() extends MapFunction[UserBehavior,(String,Long)]{
  override def map(value: UserBehavior): (String, Long) = {
    ( Random.nextString(10), 1L )
  }
}

https://www.xamrdz.com/lan/5wg1963786.html

相关文章: