本文将通过使用 Flink 框架实现 实时热门合约 需求。实际业务过程中,如何判断合约是否属于热门合约,可以从以下几个方面进行分析,比如:
交易数量:合约被调用的次数可以作为其热门程度的指标之一。
交易金额:合约处理的资金量也是评判热门程度的重要指标。
活跃用户数量:调用合约的用户数量可以反映合约的受欢迎程度。
交易频率:合约的调用频率可以反映其热门程度和使用情况。
但我们本次目的主要是关于学习 Flink API 的一些使用,以及在生产过程中,我们应该如何一步一步改进,所以本次我们主要以 交易数量 作为热门合约的评判标准。
通过本文你将学到:
如何基于 EventTime 处理,如何指定 Watermark
如何使用 Flink 灵活的 Window API
何时需要用到 State,以及如何使用
如何使用 ProcessFunction 实现 TopN 功能
如何使用 Flink DataStream API 读取 kafka 数据源
如何将计算结果 Sink 到 Kafka 存储
实战案例介绍
要实现一个 实时热门合约 的需求,我们首先拆解成以下思路:
基本需求
每隔 5 分钟输出最近一小时交易量最多的前N个合约
过滤出属于合约的交易数量
解决思路
抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
在所有交易行为数据中,过滤出合约行为进行统计
构建滑动窗口,窗口长度为1小时,滑动距离为 5 分钟
将KeyedStream中的元素存储到ListState中,当水位线超过窗口结束时间时,排序输出
按每个窗口聚合,输出每个窗口中交易量前N名的合约
数据准备
这里我们采用已经同步好在 kafka 的真实的 链上数据 ,数据结构如下:
{
"hash":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1",
"blockHash":"0x7785b54d5e82bab42a0b1a3ef015ab1f0b3dce78fe188f0838993d360e26289a",
"blockNumber":19168715,
"from":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1",//交易发起地址
"to":"0xf20f572847c23be6055f5373691c16b002cd573a16314ca2509c7c13805719c1",//交易接收地址
"value":0,
"timestamp":1707216599,
"transactionsType":1//0:普通账户交易 1:合约账户交易
}
编写程序
首先获取环境
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
创建kafka数据源
我们已经将链上数据同步管道开启,在实时同步数据到 kafka。
我们先创建一个 Transactions 的 POJO 类,所有成员变量声明成public。
/**
* 交易行为数据结构
*/
publicclassTransactions{
publicStringhash;
publicStringblockHash;
publicBigIntegerblockNumber;
publicStringfrom;
publicStringto;
publicBigIntegervalue;
publiclongtimestamp;
publicIntegertransactionsType;
publicTransactions(){}
publicTransactions(Stringhash,StringblockHash,BigIntegerblockNumber,Stringfrom,Stringto,BigIntegervalue,longtimestamp,IntegertransactionsType) {
this.hash=hash;
this.blockHash=blockHash;
this.blockNumber=blockNumber;
this.from=from;
this.to=to;
this.value=value;
this.timestamp=timestamp;
this.transactionsType=transactionsType;
?? }
publicStringgetHash() {
returnhash;
?? }
publicvoidsetHash(Stringhash) {
this.hash=hash;
?? }
publicStringgetBlockHash() {
returnblockHash;
?? }
publicvoidsetBlockHash(StringblockHash) {
this.blockHash=blockHash;
?? }
publicBigIntegergetBlockNumber() {
returnblockNumber;
?? }
publicvoidsetBlockNumber(BigIntegerblockNumber) {
this.blockNumber=blockNumber;
?? }
publicStringgetFrom() {
returnfrom;
?? }
publicvoidsetFrom(Stringfrom) {
this.from=from;
?? }
publicStringgetTo() {
returnto;
?? }
publicvoidsetTo(Stringto) {
this.to=to;
?? }
publicBigIntegergetValue() {
returnvalue;
?? }
publicvoidsetValue(BigIntegervalue) {
this.value=value;
?? }
publiclonggetTimestamp() {
returntimestamp;
?? }
publicvoidsetTimestamp(longtimestamp) {
this.timestamp=timestamp;
?? }
publicIntegergetTransactionsType() {
returntransactionsType;
?? }
publicvoidsetTransactionsType(IntegertransactionsType) {
this.transactionsType=transactionsType;
?? }
@Override
publicStringtoString() {
return"Transactions{"+
"hash='"+hash+'\''+
", blockHash='"+blockHash+'\''+
", blockNumber="+blockNumber+
", from='"+from+'\''+
", to='"+to+'\''+
", value="+value+
", timestamp="+timestamp+
", transactionsType="+transactionsType+
'}';
?? }
}
接下来我们就可以创建一个一个读取 kafka 数据的数据源。
Propertiesproperties=newProperties();
properties.setProperty("bootstrap.servers","localhost:9092");//kafka地址
properties.setProperty("group.id","ods_transactions");//消费组
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//key反序列化
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");//value反序列化
properties.setProperty("auto.offset.reset","latest");//消费偏移从最新开始
DataStreamSource<String>datasource=env.addSource(newFlinkKafkaConsumer<String>("ods_transactions",newSimpleStringSchema(),properties));
下一步我们将数据源转为 Transactions 对象类型
DataStream<Transactions>kafkaStream=datasource.map(newMapFunction<String,Transactions>() {
@Override
publicTransactionsmap(Stringmessage)throwsException{
Gsongson=newGson();
returngson.fromJson(message,Transactions.class);
? ? ? ? ?? }
? ? ?? });
这就创建了一个 Transactions 类型的 DataStream。
EventTime 与 Watermark
在本案例中,我们需要统计业务时间上的每小时的合约交易量,所以要基于 EventTime 来处理。
将每条数据的业务时间就当做 Watermark,这里我们用 AscendingTimestampExtractor 来实现时间戳的抽取和 Watermark 的生成。
DataStream<Transactions>timedStream=kafkaStream
.assignTimestampsAndWatermarks(newAscendingTimestampExtractor<Transactions>() {
@Override
publiclongextractAscendingTimestamp(Transactionstransactions) {
// 原始数据单位秒,将其转成毫秒
returntransactions.timestamp*1000;
? ? ? ? ? ? ? ? ?? }
? ? ? ? ? ? ?? });
这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。
过滤出合约交易数据
由于原始数据中存在普通交易和合约行为的数据,但是我们只需要统计合约数据,所以先使用 FilterFunction 将合约行为数据过滤出来。
DataStream<Transactions>contractStream=timedStream
.filter(newFilterFunction<Transactions>() {
@Override
publicbooleanfilter(Transactionstransactions)throwsException{
// 过滤出只有合约交易的数据
returntransactions.transactionsType.equals(TransactionEnum.CONTRACT_ADDRESS.getCode());
? ? ? ? ? ? ? ? ?? }
? ? ? ? ? ? ?? });
窗口统计合约交易量
由于要每隔5分钟统计一次最近一小时每个合约的交易量,所以窗口大小是一小时,每隔5分钟滑动一次。即分别要统计 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的合约交易量。是一个常见的滑动窗口需求(Sliding Window)。
DataStream<ContractCount>windowedStream=contractStream
.keyBy("to")
.timeWindow(Time.minutes(60),Time.minutes(5))
.aggregate(newCountAgg(),newWindowResultFunction());
这里使用.keyBy("to")对合约进行分组,使用.timeWindow(Time size, Time slide)对每个合约做滑动窗口(1小时窗口,5分钟滑动一次)。使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力。
这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。
/** COUNT 统计的聚合函数实现,每出现一条记录加一 */
publicstaticclassCountAggimplementsAggregateFunction<Transactions,Long,Long>{
@Override
publicLongcreateAccumulator() {
return0L;
? ? ?? }
@Override
publicLongadd(Transactionstransactions,Longacc) {
returnacc+1;
? ? ?? }
@Override
publicLonggetResult(Longacc) {
returnacc;
? ? ?? }
@Override
publicLongmerge(Longacc1,Longacc2) {
returnacc1+acc2;
? ? ?? }
?? }
这里实现的WindowResultFunction将主键合约地址,窗口,交易量封装成了ContractCount进行输出。
/** 用于输出窗口的结果 */
publicstaticclassWindowResultFunctionimplementsWindowFunction<Long,ContractCount,Tuple,TimeWindow>{
@Override
publicvoidapply(
Tuplekey,// 窗口的主键,即 to 合约地址
TimeWindowwindow,// 窗口
Iterable<Long>aggregateResult,// 聚合函数的结果,即 count 值
Collector<ContractCount>collector// 输出类型为 ContractCount
)throwsException{
Stringto=((Tuple1<String>)key).f0;
Longcount=aggregateResult.iterator().next();
collector.collect(ContractCount.of(to,window.getEnd(),count));
? ? ?? }
?? }
/** 合约交易量(窗口操作的输出类型) */
publicclassContractCount{
publicStringto;// 合约地址
publicLongwindowEnd;// 窗口结束时间戳
publicLongcount;// 合约交易量
publicContractCount(){}
publicContractCount(Stringto,LongwindowEnd,Longcount) {
this.to=to;
this.windowEnd=windowEnd;
this.count=count;
?? }
publicStringgetTo() {
returnto;
?? }
publicvoidsetTo(Stringto) {
this.to=to;
?? }
publicLonggetWindowEnd() {
returnwindowEnd;
?? }
publicvoidsetWindowEnd(LongwindowEnd) {
this.windowEnd=windowEnd;
?? }
publicLonggetCount() {
returncount;
?? }
publicvoidsetCount(Longcount) {
this.count=count;
?? }
@Override
publicStringtoString() {
return"ContractCount{"+
"to='"+to+'\''+
", windowEnd="+windowEnd+
", count="+count+
'}';
?? }
}
现在我们得到了每个合约在每个窗口的交易量的数据流。
TopN 计算最热门合约
为了统计每个窗口下最热门的合约,我们需要再次按窗口进行分组,这里根据ContractCount中的windowEnd进行keyBy()操作。然后使用 ProcessFunction 实现一个自定义的 TopN 函数 TopNHotContracts 来计算交易量排名前5名的合约,并将排名结果sink到 kafka。
DataStream<String>topContracts=windowedStream
.keyBy("windowEnd")
.process(newTopNHotContracts(5));// 求交易量前5的合约
topContracts.addSink(newFlinkKafkaProducer<String>(
"hot_contract",
newSimpleStringSchema(),
properties
? ? ?? ));
/** 求某个窗口中前 N 名的热门合约,key 为窗口时间戳 */
publicstaticclassTopNHotContractsextendsKeyedProcessFunction<Tuple,ContractCount,String>{
privatefinalinttopSize;
publicTopNHotContracts(inttopSize) {
this.topSize=topSize;
? ? ?? }
// 用于存储合约与交易量的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
privateListState<ContractCount>contractState;
@Override
publicvoidopen(Configurationparameters)throwsException{
super.open(parameters);
// 状态的注册
ListStateDescriptor<ContractCount>contractStateDesc=newListStateDescriptor<>(
"contractState-state",
ContractCount.class);
contractState=getRuntimeContext().getListState(contractStateDesc);
? ? ?? }
@Override
publicvoidprocessElement(
ContractCountinput,
Contextcontext,
Collector<String>collector)throwsException{
// 每条数据都保存到状态中
contractState.add(input);
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有合约数据
context.timerService().registerEventTimeTimer(input.windowEnd+1);
? ? ?? }
@Override
publicvoidonTimer(
longtimestamp,OnTimerContextctx,Collector<String>out)throwsException{
// 获取收到的所有合约交易两
List<ContractCount>allContract=newArrayList<>();
for(ContractCountcontract:contractState.get()) {
allContract.add(contract);
? ? ? ? ?? }
// 提前清除状态中的数据,释放空间
contractState.clear();
// 按照交易量从大到小排序
allContract.sort(newComparator<ContractCount>() {
@Override
publicintcompare(ContractCounto1,ContractCounto2) {
return(int) (o2.count-o1.count);
? ? ? ? ? ? ?? }
? ? ? ? ?? });
// 将排名信息放在 List ,sink 到kafka
List<HotContract>list=newArrayList<>();
for(inti=0;i<topSize;i++) {
ContractCountcurrentItem=allContract.get(i);
HotContracthotContract=newHotContract();
hotContract.setAddress(currentItem.to);
hotContract.setCount(currentItem.count);
list.add(hotContract);
? ? ? ? ?? }
out.collect(GsonUtil.toJson(list));
? ? ?? }
?? }
打印输出
最后一步我们将结果打印输出到控制台,并调用env.execute执行任务。
topContracts.print();
env.execute("Hot contracts Job");
总结
我们来回顾下整个计算的流程,以及转换的原理。
1)首先,我们通过读取kafka数据源,得到 datasourceStream
2)通过将数据源数据 map 转化为 POJO 对象,得到 kafkaStream
3)通过 filter 算子,过滤出属于合约的交易,得到 contractStream
4)按照合约地址进行 keyBy("to") 分区
5)然后进行开窗
6)进行聚合统计
这只是一个基本的流程,实际生产过程中,我们还要考虑多个方面,比如:
1)一些基本的配置信息从环境变量获取,变成参数传入,从配置文件读取等
2)开启 Checkpoint,配置 CheckpointingMode.EXACTLY_ONCE,保证端到端的Exactly-Once一致性
3)并行度的设置
4)Flink 业务日志的配置,进行监控业务日志
5)程序的容错性等等
6)在读取 kafka 数据时,在读取数据反序列化时就转为对象,而不是通过 map 进行转换