摘要:本文整理自阿里云资深技术专家,阿里云 Hologres 负责人姜伟华(果贝),在 FFA 实时湖仓专场的分享。本篇内容主要分为四个部分:
- 实时数仓分层的技术需求
- 阿里云一站式实时数仓 Hologres 介绍
- Flink x Hologres:天作之合
- 基于 Flink Catalog 的 Streaming Warehouse 实践
点击查看直播回放 & 演讲PPT
一、实时数仓分层的技术需求
首先,我们讲一讲分层技术。大数据现在越来越讲究实时化,在各种场景下都需要实时。因此大数据需要构建实时数仓,但如何构建实时数仓呢?
离线数仓的构建,有非常标准的方法论体系,首先通过分层 ODS->DWD->DWS->ADS,然后通过定时调度去实现分层和构造。相比离线数仓,实时数仓没有明确的方法论体系。因此在实践中,有各种各样的方法,但没有一个方法是万能。最近行业内提出了 Streaming Warehouse。Streaming Warehouse 的本质是分层之间能够做到实时数据的流动,从而解决实时数仓分层的问题。
下面,我们来了解下实时数仓的主流分层方案。
第一个方案,数据通过 Flink 清洗后,写到 Kafka 形成 ODS 层。再从 Kafka 消费,经过加工形成 DWD 层。然后 Flink 加工成 DWS 层,最后通过加工形成 ADS 层的数据写到 KV 引擎,供应用消费。
因为 Kafka 数据进行分析和探查很麻烦。所以,我们会同步一份 Kafka 数据到实时数仓,通过实时数仓进行分析和探查。
这个体系的优势是层次明确,分工明确。但它的劣势是需要有大量的同步任务;数据资源消耗很大;数据有很多冗余;处理链路较复杂,需要好多组件。除此之外,在这个体系下构建的实时数仓分层,尤其是 Kafka 分层,它的复用性非常差。
第二个方案,这个方法不需要分层,整体架构只需要一层。我们把数据加工清洗后,写到实时数仓里,形成 DWD 层。所有的查询都是查询 DWD 层的明细数据。所以该方法也被称之为 ELT,把明细数据写进去(EL),变换(T)在查询时进行。
这个方案的好处在于,没有 ETL,只有一层;数据修订很方便。但它的弊端有两个方面:
- 在查询性能方面,由于是明细数据查询,所以在某些场景下不能满足 QPS 或延迟的要求。
- 因为没有分层,所以数据复用很困难,很难兼顾各方面的诉求。
第三个方案,既然实时流式无法完成数据的实时数仓分层,我们可以将数据实时写入实时数仓的 DWD 层。DWS 层、ADS 层用离线的高频调度方法,实现分钟级的调度。从而借用离线数仓,进行分层构造。
这个方案的好处在于,它可以复用很多离线经验,方案成本低且成熟。但这个方案存在如下缺点:
- 延迟大。每一层的延迟都跟调度相关,随着层次越多,调度延迟越大。实时数仓也变成了准实时数仓。
- 不能完全复用离线方案。离线调度一般是小时级或天级,我们可以使用全量计算。但在分钟级调度时,必须做增量计算,否则无法及时调度。
第四个方案,Flink 实时加工后,将数据写到实时数仓形成 DWD 层。但 DWS 层或 ADS 层的构造,依赖于实时数仓的实时物化视图的能力。现在实时数仓都开始提供物化视图的能力,这种能力本质上是提供了一些简单的聚合类物化视图。
如果用户的物化视图需求较简单,可以利用实时数仓里的实时物化视图能力,将 DWS 层到 ADS 层的构建自动化。从而让物化视图的查询保证较高的 QPS。这个方案最大的缺点在于,实时物化视图还不成熟,能力有限。
二、阿里云一站式实时数仓 Hologres 介绍
接下来,介绍一下阿里云一站式实时数仓 Hologres 产品。Hologres 是阿里云自研的一站式实时数仓,它同时包含三种能力:
- 它支持传统的实时数仓和 OLAP。
- 它支持 Serving(KV)场景,有非常高的 QPS 和很低的延迟。
- 它支持数据湖和阿里云离线数仓 MaxCompute 加速的能力。
首先,在实时数仓方面。大家可以把 Hologres 当做一个常见的实时数仓。它的特点在于写入侧支持百万 RPS 实时写入,写入即可查,没有延迟。我们支持高性能的实时整行更新和局部更新。其中。整行更新是把整行替换掉,局部更新可以更新一行中的局部字段,二者都是实时更新。
在查询侧,一方面对于实时看板类的需求,支持高 QPS。比如几百或上千 QPS 的简单查询。另一方面,我们也支持复杂计算。近期,Hologres 拿到了 TPC-H 30TB 的性能世界第一的 TPC 官方认证成绩。
其次,在数据服务方面。Hologres 不但支持百万 QPS KV 点查,而且支持阿里云达摩院的 Proxima 向量检索引擎,可以支持非常高的向量检索能力。这些能力在 Hologres 中是全 SQL 表达,对用户使用非常友好。
为了同时兼顾数据服务和实时数仓的需求,我们支持行列共存的数据格式。行列共存是指,一张表的数据既有一份行存,又有一份列存,并且系统保证这两份数据是强一致的。对于 OLAP 分析,优化器会自动选择列存。对于线上服务,它会自动选择行存。
因为 Hologres 同时支持 OLAP 分析和线上服务,其中线上服务要求非常高的稳定性和 SLA。所以我们要保证 OLAP 分析和线上服务时,不会发生冲突。
因此,我们支持读写的分离、OLAP 分析和数据服务的强隔离。这里的强隔离不同于普通实时数仓的弱隔离。弱隔离一般通过资源组的方式来实现,Hologres 直接做了强隔离,并没有通过资源组。
最后,在湖仓数据交互式分析方面。对于阿里云 MaxCompute 离线数仓里的数据,数据湖的数据都可以做秒级的交互式分析。且不需要做任何的数据搬迁。这种表我们称为“外表”。用户可以将外表和内表进行关联分析。
因为 Hologres 的定位是一站式的企业级实时数仓,所以除了上述能力,我们还有很多其他能力。包括数据的治理、成本治理、数据血缘、数据脱敏、数据加密、IP 白名单、数据的备份和恢复等等。
三、Flink x Hologres:天作之合
Flink 和 Hologres 有非常深度的整合关系。最初 Hologres 团队和 Flink 团队是同一个团队,所以 Hologres 在设计之初,着重考虑如何支持 Flink。那么 Hologres 和 Flink 有哪些深度整合的能力?
第一,Hologres 可以作为 Flink 的维表。因为 Hologres 有很强的点查能力,所以它可以作为 Flink 的维表使用。在实时计算的场景下,Flink 对维表的需求很强。我们支持百万级至千万级的 RPS 查询,且维表可实时更新。因此,很多用户会把 Hologres 用作 Flink 的实时维表(比方说实时特征存储)。
第二,Hologres 可以作为 Flink 的结果表。Hologres 有非常强的实时写入和整行实时更新的能力,跟 Flink 非常匹配。因为 Flink 的输出需要强大的 Update,结果表需要很强的覆盖能力、以及实时更新能力。
与此同时,Hologres 还有很强的局部更新能力。局部更新能力在很多场景下,可以替代 Flink 的多流 Join,为客户节省成本。
第三,Hologres 可以作为 Flink 的源表。Hologres 提供了 Binlog,一张表的任何变化,比如 insert、update、delete 等等,都会产生 Binlog 事件。Flink 可以订阅 Binlog,进行驱动计算。由于 Flink 支持 Hologres 的整表读取,二者结合构成了 Flink 全增量一体化的读取能力。并且,我们对接 Flink CDC,它可以驱动 CDC 的计算。
第四,我们在 Flink 中实现了 Hologres 的 Catalog。用户不需要在 Flink 里建 Hologres 的外表。通过 Catalog 的任何操作,都会直接实时反映到 Hologres 里。通过这种方法,Flink+Hologres 就具备了整库同步、Schema Evolution 的能力。
由此可见,我们在维表、结果表、源表、Catalog 四个方面,对 Flink 和 Hologres 做了深度整合。
接下来,介绍一下 Flink 和 Hologres,如何构建 Streaming Warehouse?相比常见的 Flink+Kafka 的分层方案,Flink+Hologres 可以完全将其替换。
首先,将 Flink 写到 Hologres 里,形成 ODS 层。Flink 订阅 ODS 层的 Binlog 进行加工,将 Flink 从 DWD 层再次写入 Hologres 里。
然后,再订阅 DWD 层的 Binlog,通过计算形成 DWS 层,将其再次写入 Hologres 里。最后,由 Hologres 对外提供应用查询。
该方案相比 Kafka 有如下优点:
第一,解决了传统中间层 Kafka 数据不易查、不易更新、不易修正的问题。Hologres 的每一层都可查、可更新、可修正。
第二,Hologres 的每一层都可以单独对外提供服务。因为每一层的数据都是可查的,所以数据的复用会更好,真正实现数仓分层复用的目标。
第三,Hologres 支持数据复用,模型统一,架构简化。目前,该方案已经有很多客户使用。通过 Flink+Hologres,实现实时数仓分层。
上面讲的 Flink+Hologres 的 Streaming Warehouse 方案,其强依赖于以下三个 Hologres 核心能力:
第一个能力是 Binlog。因为实时数仓一般没有 Binlog,但 Hologres 提供了 Binlog 能力,用来驱动 Flink 做实时计算,正因为有了 binlog,Hologres 才能作为流式计算的上游。
第二个能力是行列共存。一张表既有行存数据,又有列存数据。这两份数据是强一致的。行列共存的特性让中间层的每张表,不但能够给 Flink 使用,而且可以给其他应用(比方说 OLAP、或者线上服务)使用。
第三个能力是资源强隔离。实时数仓一般是弱隔离或软隔离,通过资源组、资源队列的方法实现资源隔离。如果 Flink 的资源消耗很大,可能影响中间层的点查性能。但在强隔离的情况下,Flink 对 Binlog 的数据拉取,不会影响线上服务。
通过 Binlog、行列共存、资源强隔离的三个特点,不仅能让 Flink+Hologres 形成 Streaming Warehouse,并且能够使中间的每层数据复用,被其他应用或线上服务使用。
接下来,讲一讲基于 Flink+Hologres 的多流合并。因为 Hologres 有特别强大的局部更新能力,所以我们可以简化 Flink 的多流 Join。
比如在风控场景下,我们需要基于用户 ID 构建用户的多侧面画像。用户画像来自很多数据源,比如客户的浏览行为、成交行为、履约行为等等。把数据源的数据按照用户 ID,把每个用户放到一行里,形成不同的字段,形成用户的完整画像。
传统的方式需要用 Flink 多流 Join 实现,Flink 把上游的多个数据源关联到一起。Join 后写到 Kafka 里,然后驱动下游的 Flink,加工这行完整的数据。
多流 Join 非常耗资源。所以在 Flink+Hologres 场景下,Hologres 可以利用局部更新能力,把一张表定为定义成 Hologres 的行存表或行列共存表。此时,整个方案就简化成上游每个数据源,同步数据到 Hologres 表的若干个字段里,若干个任务同时写入这张表,然后利用 Hologres 的局部更新能力,把数据汇总在一起。
如果打开这张表的 Binlog,上游任何数据源的变化都会更新这张表,使这张表的 Binlog 中生成行数据的最新状态,然后驱动下游的 Flink 继续计算,从而完美匹配常见的风控场景。这种用法下,资源消耗、运维都得到了极大的简化。
四、基于 Flink Catalog 的 Streaming Warehouse 实践
虽然上述方案已经非常成熟,但唯一的缺点在于,用户需要在两个系统之间切换,过程比较繁琐。为了让用户操作更简单,我们基于 Flink Catalog 提供了更加简单的使用体验。
下面我们来看看怎么样基于 Flink Catalog 去构建基于 Flink+Hologres 的 Streaming Warehouse。我们会发现,有了 Flink Catalog 后,整个使用体验会很简单,并能充分发挥 Flink 和 Hologres 两个产品的强大能力。
上图是一个典型的 Flink+Hologres 实时 ETL 链路;
ODS 层、DWD 层、ODS 层的数据都存在 Hologres 中。
链路中所有的数据加工都是通过 Flink SQL 完成。在整个 ETL 链路中,用户不需要任何 Hologres SQL,直接写 Flink SQL 即可。
Flink 用户可以通过 Flink SQL 对每层中的 Hologres 数据进行数据探查(流模式和批模式都可以)。比方说,当我们发现 DWS 层的数据结果出现问题,需要查看哪层的结果有问题或逻辑有错误。此时,我们可以复用原来的 Flink SQL 来进行探查、定位或者数据重新消费。
Hologres 中的每层数据都可以对外提供查询和服务(通过 Hologres SQL)。
接下来,以某个电商场景为例,演示一下基于 Flink Catalog 的 Streaming Warehouse。如上图所示,有一个 MySQL 数据库作为订单库,里面有订单表 orders、订单支付表 orders_pay、以及产品品类表 product_catalog。
第一步,我们通过 Flink 的实时数仓,把数据实时同步到 Hologres 里,形成 ODS 层。
第二步,加工 DWD 层。将 DWD 层的数据写到 Hologres 里。在这个过程中,我们需要把订单表和订单支付表,合并成一张表,实现多路合并。与此同时,我们希望 orders 表关联商品品类表 product_catalog。
第三步,驱动下游计算,构建 DWS 层。以用户维度和商店维度,收集统计数据。比如用户每天的订单金额和商店每天的订单金额,从而形成一条完整的链路。
第四步,将 DWS 层的表推荐给系统使用。作为用户和商店的特征,用做推荐用途。
第五步,DWD 层的表能够直接用来做实时统计分析、统计产品、实时大屏、实时报表。
上图中的绿色链路,全部使用 Flink SQL 完成。橙色链路对外提供服务,由 Hologres SQL 完成。
接下来,讲一讲每个步骤是如何运行的。
第一步,在 Flink 实时数仓,形成 ODS 层。首先,创建一个 Hologres 的 Catalog。MySQL 中存储订单、支付以及商品信息 3 张表,通过 Flink Catalog 功能,将 MySQL 整库的数据实时同步至 Hologres,形成 ODS。相关代码如上图所示。我们可以看到,Mysql 整库同步到 Hologres,通过 Flink SQL 来表达是非常简单的。
第二步,DWD 实时构建。数据实时写入 ODS 层后,Flink 读取 Hologres Binlog,并用多流合并、维表关联将订单、交易、商品 3 个表打成一个大宽表,实时写入至 Hologres 的订单汇总表中,形成 DWD 层。
左图中的 SQL 是 DWD 层表的建表语句。这张目标表包含了来自 orders、orders_pay、product_catalog 的字段,关联了相关的用户信息、商户信息、订单信息、商品品类信息等等,形成了一张宽表。
右图中的 SQL 是真正的计算逻辑。这里包含两个 INSERT 语句。
第一个 INSERT 语句是从 orders 表实时打宽后写入。这里用到了 Hologres 的维表关联能力。实时打宽后,写入目标表的部分字段。
第二个 INSERT 语句是从 orders_pay 表实时同步到同一张目标表,更新另外一些字段。
这两个 INSERT 语句最大的关联在于,它们写的是同一张表,会自动利用目标表的主键 ID 进行关联。每个 INSERT 都是做了目标表的局部更新,两者的合力结果是实时更新的目标宽表。
在上述过程中,它用了多种 Hologres 能力。比如驱动 Flink 做构建 DWD 层表;维表关联能力;局部更新能力等等。
第三步,DWS 层的实时聚合。在 DWD 的基础上,通过 Flink 读取 Hologres DWD 的 Binlog 数据,进行实时指标聚合计算,比如按照用户维度聚合,按照商户维度聚合等,然后实时写入 Hologres,形成 DWS 层。
上图中,左边是 DDL 语句,右边是聚合语句。经过简单的三步后,Flink SQL 构建了完整的 Streaming Warehouse 分层体系。
第四步,构建应用。基于 DWS 层,对外提供服务。数据的分层和加工完成后,业务就可以通过 Hologres 查询数据并应用。在这个例子里,推荐系统要求非常高的点查性能,所以要求百万级的 QPS 检查能力。Hologres 的行存表或者行列共存表完全可以满足。
这个方案和传统的实时数仓最大的差别是:传统的实时数仓只有最后一层的数据,可对外提供服务。而在 Hologres 里,DWD 等中间层数据也可以对外提供服务,进行实时报表统计。用户可以在中间层进行查询操作,对接各种实时应用、实时大屏。比如直接查 DWD 层的数据,典型的如根据用户 ID 返回推荐商品(KV 场景),以及实时报表查看订单量和退单量(OLAP)。
第五步,问题排查:Flink 数据探查。如果某个业务指标出现异常,Flink 可以直接探查每层表的数据来快速定位。比如用 Flink 探查 Hologres DWD 层的 orders 表。Hologres 支持 Flink 的流模式和批模式对数据的探查。
由于流模式是 Flink 的默认模式,因此我们不需要设置执行模式。它可以直接记录数据变化,从而非常方便的查看数据异常。流模式可以探查获取一段时间范围内的数据及其变化情况。
与此同时,批模式探查是获取当前时刻的最新数据。Hologres 也支持 Flink 批模式的数据探查。批模式和流模式的区别在于,流模式关注的是变化,批模式关注的是表中的最新状态。
[图片上传失败...(image-ce588d-1676454607375)]
综上所述,当 Hologres 跟 Flink 深度整合,就可以构建强大的 Streaming Warehouse。全链路都可以用 SQL 表示,并且只需要用到 Flink 和 Hologres 两个组件,操作非常方便。
实时 ETL 链路、数据分层完全可以用 Flink SQL 实现。对外查询可以用强大的 Hologres 计算引擎来做,每层数据可复用、可查,方便构建实时数仓的数据分层和复用体系。
这种体系有着很好的性能。Hologres 有非常强的实时写入、实时更新能力和强大的 OLAP、点查能力,Flink 有着非常强的实时加工能力。
用户可以基于这个方案,利用 Hologres 强大的百万 QPS 点查能力和高性能 OLAP 能力构建各种实时应用。
与此同时,我们有很多企业级能力,让大家的运维更简单,可观测性更好,安全能力更强,从而更加方便的构建企业级的 Streaming Warehouse。
点击查看直播回放 & 演讲PPT