当前位置: 首页>数据库>正文

flink substring 函数使用 flink function

一,概述
    Flink 的AggregateFunction是一个基于中间计算结果状态进行增量计算的函数。由于是迭代计算方式,所以,在窗口处理过程中,不用缓存整个窗口的数据,所以效率执行比较高。

二,AggregateFunction接口类

输入类型(IN),累加器类型(ACC)和输出类型(OUT)。

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
  ...............................
}

自定义聚合函数需要实现AggregateFunction接口类,它有四个接口实现方法:

a.创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化

ACC createAccumulator();

b.对于数据的每条数据,和迭代数据的聚合的具体实现

ACC add(IN value, ACC accumulator);

c.合并两个累加器,返回一个具有合并状态的累加器

ACC merge(ACC a, ACC b);

d.从累加器获取聚合的结果

OUT getResult(ACC accumulator);

三、代码实例   

1.模拟场景:

    从文件/socket读取数据,数据包含三个字段:商品ID,用户ID,访问类型(1.点击查看 2.收藏 3.购买),访问时间;这里每隔3秒对最近6秒内的数据进行汇总计算各个商品的“点击查看”访问量,也就是访问类型为1的数据。

    
    这里自定义聚合函数MyCountAggregate数据进行预聚合,自定义窗口函数MyCountWindowFunction2对聚合的数据封装成字符串,并加上窗口结束时间信息进行输出。

2.数据准备:

product1,user14,1,1586855115084
product2,user19,2,1586855116087
product2,user19,1,1586855116087
product3,user17,1,1586855117089
product1,user17,1,1586855118092
product2,user17,1,1586855119095
product3,user15,1,1586855120097
product1,user12,1,1586855121100
product2,user13,1,1586855122102
product3,user13,1,1586855123105
product1,user13,1,1586855124108
product2,user19,3,1586855116087
product2,user16,1,1586855125111
product1,user17,1,1586855136113
product1,user14,1,1586855127116
product2,user16,1,1586855128119
product2,user16,1,1586855129122
product3,user16,1,1586855130125
product2,user11,1,1586855131128
product1,user16,1,1586855132131
product2,user13,1,1586855133134
product3,user16,1,1586855134137
product3,user13,1,1586855135139
product2,user19,3,1586855116087
product1,user18,1,1586855136142
product2,user12,1,1586855137145
product1,user13,1,1586855138148
product3,user17,1,1586855139150

3.自定义聚合函数MyCountAggregate

package com.hadoop.ljs.flink110.aggreagate;
import org.apache.flink.api.common.functions.AggregateFunction;
/**
 * @author: Created By lujisen
 * @company ChinaUnicom Software JiNan
 * @date: 2020-04-15 22:00
 * @version: v1.0
 * @description: com.hadoop.ljs.flink110.aggreagate
 * 输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。
 */
public class MyCountAggregate implements AggregateFunction<ProductViewData, Long, Long> {
    @Override
    public Long createAccumulator() {
        /*访问量初始化为0*/
        return 0L;
    }
    @Override
    public Long add(ProductViewData value, Long accumulator) {
        /*访问量直接+1 即可*/
        return accumulator+1;
    }
    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }
    /*合并两个统计量*/
    @Override
    public Long merge(Long a, Long b) {
        return a+b;
    }
}

4.自定义窗口函数

package com.hadoop.ljs.flink110.aggreagate;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
/**
 * @author: Created By lujisen
 * @company ChinaUnicom Software JiNan
 * @date: 2020-04-15 21:56
 * @version: v1.0
 * @description: com.hadoop.ljs.flink110.aggreagate
 *  *自定义窗口函数,封装成字符串
 *  *第一个参数是上面MyCountAggregate的输出,就是商品的访问量统计
 *  * 第二个参数 输出 这里为了演示 简单输出字符串
 *  * 第三个就是 窗口类 能获取窗口结束时间
 */
public class MyCountWindowFunction2 implements WindowFunction<Long,String,String, TimeWindow> {
@Override
public void apply(String productId, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {
     /*商品访问统计输出*/
    /*out.collect("productId"productId,window.getEnd(),input.iterator().next()));*/
    out.collect("----------------窗口时间:"+window.getEnd());
    out.collect("商品ID: "+productId+"  浏览量: "+input.iterator().next());
    }
}

5.主函数,代码如下:

package com.hadoop.ljs.flink110.aggreagate;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author: Created By lujisen
 * @company ChinaUnicom Software JiNan
 * @date: 2020-04-14 11:28
 * @version: v1.0
 * @description: com.hadoop.ljs.flink110.aggreagate
 * 自定义聚合函数类和窗口类,进行商品访问量的统计,根据滑动时间窗口,按照访问量排序输出
 */
public class AggregateFunctionMain2 {

    public  static int windowSize=6000;/*滑动窗口大小*/
    public  static int windowSlider=3000;/*滑动窗口滑动间隔*/
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        /*DataStream<String> sourceData = senv.socketTextStream("localhost",9000);*/ 
        //从文件读取数据,也可以从socket读取数据       
        DataStream<String> sourceData = senv.readTextFile("D:\projectData\ProductViewData2.txt");
        DataStream<ProductViewData> productViewData = sourceData.map(new MapFunction<String, ProductViewData>() {
            @Override
            public ProductViewData map(String value) throws Exception {
                String[] record = value.split(",");
                return new ProductViewData(record[0], record[1], Long.valueOf(record[2]), Long.valueOf(record[3]));
            }
        }).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<ProductViewData>(){
            @Override
            public long extractAscendingTimestamp(ProductViewData element) {
                return element.timestamp;
            }
        });
        /*过滤操作类型为1  点击查看的操作*/
        DataStream<String> productViewCount = productViewData.filter(new FilterFunction<ProductViewData>() {
            @Override
            public boolean filter(ProductViewData value) throws Exception {
                if(value.operationType==1){
                    return true;
                }
                return false;
            }
        }).keyBy(new KeySelector<ProductViewData, String>() {
            @Override
            public String getKey(ProductViewData value) throws Exception {
                return value.productId;
            }
            //时间窗口 6秒  滑动间隔3秒
        }).timeWindow(Time.milliseconds(windowSize), Time.milliseconds(windowSlider))
        /*这里按照窗口进行聚合*/
        .aggregate(new MyCountAggregate(), new MyCountWindowFunction2());
        //聚合结果输出
        productViewCount.print();

        senv.execute("AggregateFunctionMain2");
    }
}

   这里自定义聚合函数演示完毕


https://www.xamrdz.com/database/6pn1938659.html

相关文章: