当前位置: 首页>后端>正文

Flink 实战:如何计算实时热门合约

本文将通过使用 Flink 框架实现 实时热门合约 需求。实际业务过程中,如何判断合约是否属于热门合约,可以从以下几个方面进行分析,比如:

交易数量:合约被调用的次数可以作为其热门程度的指标之一。

交易金额:合约处理的资金量也是评判热门程度的重要指标。

活跃用户数量:调用合约的用户数量可以反映合约的受欢迎程度。

交易频率:合约的调用频率可以反映其热门程度和使用情况。

但我们本次目的主要是关于学习 Flink API 的一些使用,以及在生产过程中,我们应该如何一步一步改进,所以本次我们主要以 交易数量 作为热门合约的评判标准。

通过本文你将学到:

如何基于 EventTime 处理,如何指定 Watermark

如何使用 Flink 灵活的 Window API

何时需要用到 State,以及如何使用

如何使用 ProcessFunction 实现 TopN 功能

如何使用 Flink DataStream API 读取 kafka 数据源

如何将计算结果 Sink 到 Kafka 存储

实战案例介绍

要实现一个 实时热门合约 的需求,我们首先拆解成以下思路:

基本需求

每隔 5 分钟输出最近一小时交易量最多的前N个合约

过滤出属于合约的交易数量

解决思路

抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口

在所有交易行为数据中,过滤出合约行为进行统计

构建滑动窗口,窗口长度为1小时,滑动距离为 5 分钟

将KeyedStream中的元素存储到ListState中,当水位线超过窗口结束时间时,排序输出

按每个窗口聚合,输出每个窗口中交易量前N名的合约

数据准备

这里我们采用已经同步好在 kafka 的真实的 链上数据 ,数据结构如下:

{

"hash":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1",

"blockHash":"0x7785b54d5e82bab42a0b1a3ef015ab1f0b3dce78fe188f0838993d360e26289a",

"blockNumber":19168715,

"from":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1",//交易发起地址

"to":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1",//交易接收地址

"value":0,

"timestamp":1707216599,

"transactionsType":1//0:普通账户交易 1:合约账户交易

}

编写程序

首先获取环境

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();

创建kafka数据源

我们已经将链上数据同步管道开启,在实时同步数据到 kafka。

我们先创建一个 Transactions 的 POJO 类,所有成员变量声明成public。

/**

* 交易行为数据结构

*/

publicclassTransactions{

publicStringhash;

publicStringblockHash;

publicBigIntegerblockNumber;

publicStringfrom;

publicStringto;

publicBigIntegervalue;

publiclongtimestamp;

publicIntegertransactionsType;

publicTransactions(){}

publicTransactions(Stringhash,StringblockHash,BigIntegerblockNumber,Stringfrom,Stringto,BigIntegervalue,longtimestamp,IntegertransactionsType) {

this.hash=hash;

this.blockHash=blockHash;

this.blockNumber=blockNumber;

this.from=from;

this.to=to;

this.value=value;

this.timestamp=timestamp;

this.transactionsType=transactionsType;

?? }

publicStringgetHash() {

returnhash;

?? }

publicvoidsetHash(Stringhash) {

this.hash=hash;

?? }

publicStringgetBlockHash() {

returnblockHash;

?? }

publicvoidsetBlockHash(StringblockHash) {

this.blockHash=blockHash;

?? }

publicBigIntegergetBlockNumber() {

returnblockNumber;

?? }

publicvoidsetBlockNumber(BigIntegerblockNumber) {

this.blockNumber=blockNumber;

?? }

publicStringgetFrom() {

returnfrom;

?? }

publicvoidsetFrom(Stringfrom) {

this.from=from;

?? }

publicStringgetTo() {

returnto;

?? }

publicvoidsetTo(Stringto) {

this.to=to;

?? }

publicBigIntegergetValue() {

returnvalue;

?? }

publicvoidsetValue(BigIntegervalue) {

this.value=value;

?? }

publiclonggetTimestamp() {

returntimestamp;

?? }

publicvoidsetTimestamp(longtimestamp) {

this.timestamp=timestamp;

?? }

publicIntegergetTransactionsType() {

returntransactionsType;

?? }

publicvoidsetTransactionsType(IntegertransactionsType) {

this.transactionsType=transactionsType;

?? }

@Override

publicStringtoString() {

return"Transactions{"+

"hash='"+hash+'\''+

", blockHash='"+blockHash+'\''+

", blockNumber="+blockNumber+

", from='"+from+'\''+

", to='"+to+'\''+

", value="+value+

", timestamp="+timestamp+

", transactionsType="+transactionsType+

'}';

?? }

}

接下来我们就可以创建一个一个读取 kafka 数据的数据源。

Propertiesproperties=newProperties();

properties.setProperty("bootstrap.servers","localhost:9092");//kafka地址

properties.setProperty("group.id","ods_transactions");//消费组

properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//key反序列化

properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//value反序列化

properties.setProperty("auto.offset.reset","latest");//消费偏移从最新开始

DataStreamSource<String>datasource=env.addSource(newFlinkKafkaConsumer<String>("ods_transactions",newSimpleStringSchema(),properties));

下一步我们将数据源转为 Transactions 对象类型

DataStream<Transactions>kafkaStream=datasource.map(newMapFunction<String,Transactions>() {

@Override

publicTransactionsmap(Stringmessage)throwsException{

Gsongson=newGson();

returngson.fromJson(message,Transactions.class);

? ? ? ? ?? }

? ? ?? });

这就创建了一个 Transactions 类型的 DataStream。

EventTime 与 Watermark

在本案例中,我们需要统计业务时间上的每小时的合约交易量,所以要基于 EventTime 来处理。

将每条数据的业务时间就当做 Watermark,这里我们用 AscendingTimestampExtractor 来实现时间戳的抽取和 Watermark 的生成。

DataStream<Transactions>timedStream=kafkaStream

.assignTimestampsAndWatermarks(newAscendingTimestampExtractor<Transactions>() {

@Override

publiclongextractAscendingTimestamp(Transactionstransactions) {

// 原始数据单位秒,将其转成毫秒

returntransactions.timestamp*1000;

? ? ? ? ? ? ? ? ?? }

? ? ? ? ? ? ?? });

这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

过滤出合约交易数据

由于原始数据中存在普通交易和合约行为的数据,但是我们只需要统计合约数据,所以先使用 FilterFunction 将合约行为数据过滤出来。

DataStream<Transactions>contractStream=timedStream

.filter(newFilterFunction<Transactions>() {

@Override

publicbooleanfilter(Transactionstransactions)throwsException{

// 过滤出只有合约交易的数据

returntransactions.transactionsType.equals(TransactionEnum.CONTRACT_ADDRESS.getCode());

? ? ? ? ? ? ? ? ?? }

? ? ? ? ? ? ?? });

窗口统计合约交易量

由于要每隔5分钟统计一次最近一小时每个合约的交易量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的合约交易量。是一个常见的滑动窗口需求(Sliding Window)。

DataStream<ContractCount>windowedStream=contractStream

.keyBy("to")

.timeWindow(Time.minutes(60),Time.minutes(5))

.aggregate(newCountAgg(),newWindowResultFunction());

这里使用.keyBy("to")对合约进行分组,使用.timeWindow(Time size, Time slide)对每个合约做滑动窗口(1小时窗口,5分钟滑动一次)。使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力。

这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。

/** COUNT 统计的聚合函数实现,每出现一条记录加一 */

publicstaticclassCountAggimplementsAggregateFunction<Transactions,Long,Long>{

@Override

publicLongcreateAccumulator() {

return0L;

? ? ?? }

@Override

publicLongadd(Transactionstransactions,Longacc) {

returnacc+1;

? ? ?? }

@Override

publicLonggetResult(Longacc) {

returnacc;

? ? ?? }

@Override

publicLongmerge(Longacc1,Longacc2) {

returnacc1+acc2;

? ? ?? }

?? }

这里实现的WindowResultFunction将主键合约地址,窗口,交易量封装成了ContractCount进行输出。

/** 用于输出窗口的结果 */

publicstaticclassWindowResultFunctionimplementsWindowFunction<Long,ContractCount,Tuple,TimeWindow>{

@Override

publicvoidapply(

Tuplekey,// 窗口的主键,即 to 合约地址

TimeWindowwindow,// 窗口

Iterable<Long>aggregateResult,// 聚合函数的结果,即 count 值

Collector<ContractCount>collector// 输出类型为 ContractCount

)throwsException{

Stringto=((Tuple1<String>)key).f0;

Longcount=aggregateResult.iterator().next();

collector.collect(ContractCount.of(to,window.getEnd(),count));

? ? ?? }

?? }

/** 合约交易量(窗口操作的输出类型) */

publicclassContractCount{

publicStringto;// 合约地址

publicLongwindowEnd;// 窗口结束时间戳

publicLongcount;// 合约交易量

publicContractCount(){}

publicContractCount(Stringto,LongwindowEnd,Longcount) {

this.to=to;

this.windowEnd=windowEnd;

this.count=count;

?? }

publicStringgetTo() {

returnto;

?? }

publicvoidsetTo(Stringto) {

this.to=to;

?? }

publicLonggetWindowEnd() {

returnwindowEnd;

?? }

publicvoidsetWindowEnd(LongwindowEnd) {

this.windowEnd=windowEnd;

?? }

publicLonggetCount() {

returncount;

?? }

publicvoidsetCount(Longcount) {

this.count=count;

?? }

@Override

publicStringtoString() {

return"ContractCount{"+

"to='"+to+'\''+

", windowEnd="+windowEnd+

", count="+count+

'}';

?? }

}

现在我们得到了每个合约在每个窗口的交易量的数据流。

TopN 计算最热门合约

为了统计每个窗口下最热门的合约,我们需要再次按窗口进行分组,这里根据ContractCount中的windowEnd进行keyBy()操作。然后使用 ProcessFunction 实现一个自定义的 TopN 函数 TopNHotContracts 来计算交易量排名前5名的合约,并将排名结果sink到 kafka。

DataStream<String>topContracts=windowedStream

.keyBy("windowEnd")

.process(newTopNHotContracts(5));// 求交易量前5的合约

topContracts.addSink(newFlinkKafkaProducer<String>(

"hot_contract",

newSimpleStringSchema(),

properties

? ? ?? ));

/** 求某个窗口中前 N 名的热门合约,key 为窗口时间戳 */

publicstaticclassTopNHotContractsextendsKeyedProcessFunction<Tuple,ContractCount,String>{

privatefinalinttopSize;

publicTopNHotContracts(inttopSize) {

this.topSize=topSize;

? ? ?? }

// 用于存储合约与交易量的状态,待收齐同一个窗口的数据后,再触发 TopN 计算

privateListState<ContractCount>contractState;

@Override

publicvoidopen(Configurationparameters)throwsException{

super.open(parameters);

// 状态的注册

ListStateDescriptor<ContractCount>contractStateDesc=newListStateDescriptor<>(

"contractState-state",

ContractCount.class);

contractState=getRuntimeContext().getListState(contractStateDesc);

? ? ?? }

@Override

publicvoidprocessElement(

ContractCountinput,

Contextcontext,

Collector<String>collector)throwsException{

// 每条数据都保存到状态中

contractState.add(input);

// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有合约数据

context.timerService().registerEventTimeTimer(input.windowEnd+1);

? ? ?? }

@Override

publicvoidonTimer(

longtimestamp,OnTimerContextctx,Collector<String>out)throwsException{

// 获取收到的所有合约交易两

List<ContractCount>allContract=newArrayList<>();

for(ContractCountcontract:contractState.get()) {

allContract.add(contract);

? ? ? ? ?? }

// 提前清除状态中的数据,释放空间

contractState.clear();

// 按照交易量从大到小排序

allContract.sort(newComparator<ContractCount>() {

@Override

publicintcompare(ContractCounto1,ContractCounto2) {

return(int) (o2.count-o1.count);

? ? ? ? ? ? ?? }

? ? ? ? ?? });

// 将排名信息放在 List ,sink 到kafka

List<HotContract>list=newArrayList<>();

for(inti=0;i<topSize;i++) {

ContractCountcurrentItem=allContract.get(i);

HotContracthotContract=newHotContract();

hotContract.setAddress(currentItem.to);

hotContract.setCount(currentItem.count);

list.add(hotContract);

? ? ? ? ?? }

out.collect(GsonUtil.toJson(list));

? ? ?? }

?? }

打印输出

最后一步我们将结果打印输出到控制台,并调用env.execute执行任务。

topContracts.print();

env.execute("Hot contracts Job");

总结

我们来回顾下整个计算的流程,以及转换的原理。

1)首先,我们通过读取kafka数据源,得到 datasourceStream

Flink 实战:如何计算实时热门合约,第1张

2)通过将数据源数据 map 转化为 POJO 对象,得到 kafkaStream

Flink 实战:如何计算实时热门合约,第2张

3)通过 filter 算子,过滤出属于合约的交易,得到 contractStream

Flink 实战:如何计算实时热门合约,第3张

4)按照合约地址进行 keyBy("to") 分区

Flink 实战:如何计算实时热门合约,第4张

5)然后进行开窗

Flink 实战:如何计算实时热门合约,第5张

6)进行聚合统计

Flink 实战:如何计算实时热门合约,第6张

这只是一个基本的流程,实际生产过程中,我们还要考虑多个方面,比如:

1)一些基本的配置信息从环境变量获取,变成参数传入,从配置文件读取等

2)开启 Checkpoint,配置 CheckpointingMode.EXACTLY_ONCE,保证端到端的Exactly-Once一致性

3)并行度的设置

4)Flink 业务日志的配置,进行监控业务日志

5)程序的容错性等等

6)在读取 kafka 数据时,在读取数据反序列化时就转为对象,而不是通过 map 进行转换


https://www.xamrdz.com/backend/3vn1941172.html

相关文章: