1.知识点
- scala样例类
- 计算pv时的key的处理方式。
1)并行处理 (解决数据倾斜问题)
先map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key
再以窗口结束时间做key分组求和
2) 并行度为1时,结合timeWindowAll如何处理(容易数据倾斜)
- MapFunction的的使用
- AggregateFunction预聚合函数的使用
- WindowFunction的使用
- KeyedProcessFunction的使用
2.业务目标
3.流程心法
4.模块详解
4.1 创建输入输出样例类
4.2 主object实现
4.2.1 创建执行环境并添加数据源
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(6)
// 从文件中读取数据,获取resources中的文件,相对路径
val resource = getClass.getResource("/UserBehavior.csv")
print(resource.getPath)
val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
4.2.2 Datastream map转换为输入样例类
// 转换成样例类类型并提取时间戳和watermark
val dataStream: DataStream[UserBehavior] = inputStream
.map(data => {
val arr = data.split(",")
UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
}) //No implicits found for parameter evidence:,需要引入 createTypeInformation,或者直接引入import org.apache.flink.streaming.api.scala._
.assignAscendingTimestamps(_.timestamp * 1000L)
4.2.3 处理逻辑(1)----filter类型,自定义mapFunction
val pvStream = dataStream
.filter(_.behavior == "pv")
// .map( data => ("pv",1L)) //第一种方式:定义一个Pv字符串作为分组的哑key,所有数据被分到同一个组,跟timeWindowAll操作,相当于分发到同一个组里去,并行度就无意义了。同样如果有同一个商品是个热点数据,那么大量的同一个商品都会被分到同一个分区中,从而引发数据倾斜。那么如何规避数据倾斜呢?
.map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key
.keyBy(_._1)
.timeWindow(Time.hours(1)) //1小时滚动窗口
.aggregate(new PvCountAgg(),new PvCountWindowResult()) //报错原因,定义的输入是map中的data是(String,Long),但是"pv",1中的1是Int类型,把1改成1L即可
自定义Mapfunction
class MyMapper() extends MapFunction[UserBehavior,(String,Long)]{
override def map(value: UserBehavior): (String, Long) = {
( Random.nextString(10), 1L )
}
}
预聚合函数
// 自定义预聚合函数
class PvCountAgg() extends AggregateFunction[(String,Long),Long,Long]{
override def createAccumulator(): Long = 0L
override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
自定义窗口函数
//自定义窗口函数
class PvCountWindowResult extends WindowFunction[Long,PvCount,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
out.collect(PvCount(window.getEnd,input.head))
}
}
4.2.4 处理逻辑(2)---以窗口结束时间分组计算PV之和
自定义mapper之后,每个key没hash打散到不同分组,上一步输出的是每个组里的每个key的pv,那么如果要计算当前窗口的总pv,就要按照winEnd来重新keyBy
val totalPvStream = pvStream
.keyBy(_.windowEnd)
// .sum("count" ) //此操作是当前来一条数据就加1,所以会滚动输出,但是我们要的结果是当前窗口收集齐之后再输出,如何来做?
// 自定义一个keyedProcessFunction自行处理吧,如何来判断数据都收集齐了呢。用一个定时器
.process(new TotalPvCountResult())
totalPvStream.print()
KeyedProcessFunction实现:
每次求和放在一个ValueState中,每来一条数据就状态value+count值。那么什么时候判断收集完了呢。定义个计时器延迟1ms触发,触发后清空状态
class TotalPvCountResult extends KeyedProcessFunction[Long,PvCount,PvCount]{
//求和就要把每次求和放在状态变量里,如何判断数据都到齐了,增量聚合的话定义一个状态
lazy val totalPvCountResultState :ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pv", classOf[Long]))
override def processElement(i: PvCount, context: KeyedProcessFunction[Long, PvCount, PvCount]#Context, collector: Collector[PvCount]): Unit = {
// 每来一个数据,将count值叠加在当前的状态上
val currentTotalCount = totalPvCountResultState.value()
totalPvCountResultState.update(currentTotalCount + i.count) //上次输出的PVCOUNT中的count,加上当前状态中的值,再更新当前状态
//注册定时器1ms后触发
context.timerService().registerEventTimeTimer(i.windowEnd + 1)
// collector.collect( )
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
val totalPvCount = totalPvCountResultState.value()
out.collect(PvCount(ctx.getCurrentKey, totalPvCount)) //获取ctx.getCurrentKey 当前key,和当前的状态值并且输出
totalPvCountResultState.clear() //发送之后清空状态
}
}
4.3 完整代码
package com.iqyi.bi.networkflow_analysis
import org.apache.flink.api.common.functions.{AggregateFunction, MapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import scala.util.Random
// 定义输入数据样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
// 定义输出pv统计的样例类
case class PvCount(windowEnd: Long, count: Long)
object PageView {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(6)
// 从文件中读取数据,获取resources中的文件,相对路径
val resource = getClass.getResource("/UserBehavior.csv")
print(resource.getPath)
val inputStream: DataStream[String] = env.readTextFile(resource.getPath)
// 转换成样例类类型并提取时间戳和watermark
val dataStream: DataStream[UserBehavior] = inputStream
.map(data => {
val arr = data.split(",")
UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong)
}) //No implicits found for parameter evidence:,需要引入 createTypeInformation,或者直接引入import org.apache.flink.streaming.api.scala._
.assignAscendingTimestamps(_.timestamp * 1000L)
val pvStream = dataStream
.filter(_.behavior == "pv")
// .map( data => ("pv",1L)) //第一种方式:定义一个Pv字符串作为分组的哑key,所有数据被分到同一个组,跟timeWindowAll操作,相当于分发到同一个组里去,并行度就无意义了。同样如果有同一个商品是个热点数据,那么大量的同一个商品都会被分到同一个分区中,从而引发数据倾斜。那么如何规避数据倾斜呢?
.map(new MyMapper())//第二种方式:自定义mapper,生成一个随机生成的key
.keyBy(_._1)
.timeWindow(Time.hours(1)) //1小时滚动窗口
.aggregate(new PvCountAgg(),new PvCountWindowResult()) //报错原因,定义的输入是map中的data是(String,Long),但是"pv",1中的1是Int类型,把1改成1L即可
//自定义mapper之后,每个key没hash打散到不同分组,上一步输出的是每个组里的每个key的pv,那么如果要计算当前窗口的总pv,就要按照winEnd来重新keyBy
val totalPvStream = pvStream
.keyBy(_.windowEnd)
// .sum("count" ) //此操作是当前来一条数据就加1,所以会滚动输出,但是我们要的结果是当前窗口收集齐之后再输出,如何来做?
// 自定义一个keyedProcessFunction自行处理吧,如何来判断数据都收集齐了呢。用一个定时器
.process(new TotalPvCountResult())
totalPvStream.print()
env.execute("pv stream")
}
}
// 自定义预聚合函数
class PvCountAgg() extends AggregateFunction[(String,Long),Long,Long]{
override def createAccumulator(): Long = 0L
override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
//自定义窗口函数
class PvCountWindowResult extends WindowFunction[Long,PvCount,String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PvCount]): Unit = {
out.collect(PvCount(window.getEnd,input.head))
}
}
class TotalPvCountResult extends KeyedProcessFunction[Long,PvCount,PvCount]{
//求和就要把每次求和放在状态变量里,如何判断数据都到齐了,增量聚合的话定义一个状态
lazy val totalPvCountResultState :ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("total-pv", classOf[Long]))
override def processElement(i: PvCount, context: KeyedProcessFunction[Long, PvCount, PvCount]#Context, collector: Collector[PvCount]): Unit = {
// 每来一个数据,将count值叠加在当前的状态上
val currentTotalCount = totalPvCountResultState.value()
totalPvCountResultState.update(currentTotalCount + i.count) //上次输出的PVCOUNT中的count,加上当前状态中的值,再更新当前状态
//注册定时器1ms后触发
context.timerService().registerEventTimeTimer(i.windowEnd + 1)
// collector.collect( )
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PvCount, PvCount]#OnTimerContext, out: Collector[PvCount]): Unit = {
val totalPvCount = totalPvCountResultState.value()
out.collect(PvCount(ctx.getCurrentKey, totalPvCount)) //获取ctx.getCurrentKey 当前key,和当前的状态值并且输出
totalPvCountResultState.clear() //发送之后清空状态
}
}
class MyMapper() extends MapFunction[UserBehavior,(String,Long)]{
override def map(value: UserBehavior): (String, Long) = {
( Random.nextString(10), 1L )
}
}