公司简介:芒果 TV 作为湖南广电旗下互联网视频平台,在“一云多屏,多元一体”的战略指导下,通过内容自制,培植核心竞争力,从独播、独特走向独创,并通过市场化运作完成 A 轮、B 轮融资,并于 2018 年 6 月,顺利实现资产重组,成为国内 A 股首家国有控股的视频平台。
一、芒果 TV 实时数仓建设历程
芒果 TV 实时数仓的建设共分为三个阶段,14-19 年为第一阶段,技术选型采用 Storm/Flink Java+Spark SQL。20-22 年上半年为第二阶段,技术选型采用 Flink SQL+Spark SQL 。22 年下半年-至今为第三阶段,技术选型采用 Flink SQL+StarRocks。每一次升级都是在原有基础上进行迭代,以求更全面的功能,更快的速度,能更好的满足业务方的需求。接下来逐一介绍。
第一代基于 Storm/Flink Java+Spark SQL
芒果 TV 的实时数据处理很早就开始了,最开始用的是 Storm,到了 18 年时,Flink 横空出世。 Flink 的 State 与流处理的优势让人眼前一亮,并且开源社区的大热与大厂的相继入坑,让人无法拒绝,所以改用了 Flink 来搭建实时数仓,但当时主要以满足业务方需求为主,进行烟囱式的开发。基本流程是接上游 Kafka 的数据,使用 Flink Java 进行相关业务逻辑处理后,将数据输出至对象存储中。然后使用 Spark SQL 对数据进行统计等二次加工处理后,再交付客户使用。此阶段优点是利用了 Flink 的长处,让数据从源头到终端更实时化了,满足了业务方对数据的时效性与业务需求。缺点是来一个需求就做一个功能,并未有实时数仓的建设与沉淀。
第二代基于 Flink SQL+Spark SQL
基于上一阶段的技术积累与发现的问题,提出了建设实时数仓的新方案。此时 Flink SQL 功能已初步完善,能满足搭建数仓的各方面需求,SQL 化相较 Flink Java 也能降低开发、维护等各方面成本。于是选择 Flink SQL 来搭建实时数仓。此阶段对实时数仓进行了分层架构设计,这个后面有详细讲解。基本流程是接上游 Kafka 数据进行格式化后输出至 Kafka,下层接到 Kafka 数据进行字段处理、垃圾数据过滤等操作后输出至 Kafka,最后一层接 Kafka 数据进行维度扩展,然后将数据写至对象存储中。再由 Spark SQL 读取对象存储中的数据进行统计等处理后,交付客户使用。此阶段的优点是实现了数仓的分层构架设计,对各层数据定义了标准化,实现了各层数据解耦,避免了烟囱式的开发,解决了重复开发等问题,实时数仓逐步走向成熟。缺点是使用 Spark SQL 进行后续统计与汇总时,不够灵活。需要提前设计好指标,面对客户多变的需求时,往往不能很及时的响应。
第三代基于 Flink SQL+StarRocks
随着实时数仓的建设逐步加深,Spark SQL 不够灵活,处理速度不够快的弊端越发突出。此时 StarRocks 进入了我们的视线,其 MPP 的架构、向量化引擎、多表 Join 等特性所展现出来在性能、易用性等方面的优势,都很好的弥补了 Spark SQL 在这块的不足。于是经调研后决定,在实时数仓中用 StarRocks 替换掉 Spark SQL 。在此阶段,前面用 Flink SQL 搭建的实时数仓分层构架并未改变,而下游用 Spark SQL 进行统计分析的相关功能,逐步替换成了用 StarRocks 来做。而基于 StarRocks 的优势与搭建实时数仓遇到的痛点,我们并没有照搬之前 Spark SQL 的模式,而是选用了新的模式。使用 StarRocks 实现即席查询。之前是用 Spark SQL 先将数据进行统计与汇总后,将最终结果数据写入对象存储中。而现在是直接用 StarRocks 对明细数据进行汇总,展示到前端页面中。这么做的好处是能更快、更灵活的满足业务方的需求,减少了开发工作量,减少了测试、上线等时间。StarRocks 优秀的性能让即席查询速度并未变慢,功能更强大,更灵活,交付速度变更快了。
二、自研 Flink 实时计算调度平台介绍
现有痛点
原生任务命令复杂,调试麻烦,开发成本比较高。
连接器, UDF,Jar 任务包等无法管理,调试复杂,经常遇到依赖冲突问题。
无法做到统一的监控报警以及对资源上的权限管理。
SQL 任务开发复杂,没有一个好用的编辑器和代码管理及保存平台。
基础表, 维表, Catalog 没有记录和可视化的平台。
多版本和跨云任务无法很好的管理。
没有很好的日志管理机制,无法做到生产环境问题的快速定位。
平台架构设计
实时 Flink 调度平台架构图:
平台主要分为三个部分:
-
Phoenix Web 模块主要负责面向用户。
- 集群部署与任务提交。
- 公司各内部业务权限管理。
- 支持 Catalog 及多源源信息管理。
- UDF,连接器等三方依赖 Jar 包管理。
- 多类型监控报警以及日志管理。
- SQL 可视化编辑和校验以及多版本存储。
-
Flink SQL Gateway 和 Flink Jar Gateway 都是基于开源版本修改定制后的服务,支持 SQL 符合业务场景的解析和校验以及 Jar 任务的提交,支持本地模式,Yarn-per-job 模式和 Application 模式,也支持自动的 Savepoint。
进行 SQL 的解析和校验。
加载 SQL 和 Jar 任务所需要的三方依赖。
SQL 任务连接 Catalog 存储进行关联和映射。
Checkpoint 和 Savepoint 的自动管理和恢复。
Jar 类型任务启动参数的注入。
运行时配置的自适应。
多类型的提交方式适配。
混合多云模块主要负责启动任务的分发和云之间的信息管理。
三、 Flink SQL 实时数仓分层实践
使用 Flink SQL 搭建实时数仓时,首要问题是数仓分层架构如何解决,业界内有许多优秀的经验可以参考,同时也基于我们的情况,最终采用了如下数仓架构:
ODS 层:原始日志层,在该层将上游 Binlog 日志、用户行为日志、外部数据等数据源同步至数仓,对多种数据源、多种格式的数据通过统一 UDF 函数解析、格式化,最终输出格式化 JSON 数据。
DW 层:数据明细层,在该层主要进行错误数据过滤、字段转义、统一字段名等处理,输出的数据已能满足日常基础分析的使用。
DM 层:数据模型层,在该层进行扩维,补充相关的公共信息。再按业务进行分域,输出的数据具有更丰富的维度,可以满足高级分析的数据使用需求。
ST 层:数据应用层,按业务、功能等维度进行汇总,交由给前端页面进行展现,输出的数据可交付 Web、App、小程序等功能使用。
四、Flink SQL 实时数仓生产过程遇到的问题
在搭建实时数仓时,遇到了不少的问题,下面挑几个典型的问题讲解一下解决思路:
1、多表关联,这个在做数仓时,是个非常重要且普遍会要用到的功能,我们在使用 Flink SQL 搭建实时数仓初期,面对 Flink 琳琅满目的 Join 类型着实挑花了眼,尤其是涉及多表关联时,有些维表的数据在 Hive 里,有些维表又在 MySQL 中,甚至还有些维表数据在其它 OLAP 中,该选择何种关联方式是当时摆在面前的一大问题,后经过多次尝试,在性能、功能性等方面的多方权衡之下,总结出如下规则:
流表关联维表(小数据量),使用 Lookup Join,维表数据量在十万以下时,可使用 Hive 表做维表,因为离线数仓中的维表数据大部分都在 Hive 中,这样的话就可以直接复用,省去数据导入导出的额外工作,并且性能方面没有瓶颈,维表小时更新后,Flink SQL 也能读到最新数据。
流表关联维表(大数据量),使用 Lookup Join,维表数据量在十万 – 千万以下时,可用 MySQL 表做维表,此时用 Hive 维表已不能满足性能需求。可将数据导出至 MySQL 中,利用缓存机制,也能很好的满足要求。
流表关联流表,使用 Interval Join,通过两个流表的时间字段来控制关联范围,这种关联方式是目前用的比较多的。使用方式也跟离线比较接近。
2、复杂的表处理,在一些数据清洗的复杂场景中,在关联维表时,维表的数据会要经过一层甚至多层的处理才能使用,离线数仓在这种场景下,可以直接在 Join 时写多层子查询来一步到位。但 Flink SQL 中不支持,在底层机制上就拒绝了。经过多次尝试与挣扎,最后采取的方案是在 Hive 中将维表数据进行预处理,实时数仓使用预处理后的维表数据。不过这只是一个过渡方案,目前从社区了解到,后续会有新的机制来实现在维表上进行任意的复杂计算后再做维表关联。不得不说 Flink 社区的更新还是非常的快。
3、State 过大,在两个流表进行关联或进行汇总统计时,Flink 的机制是会将数据缓存在 State 中。这就导致State 会过大,导致 GC 频繁,进而任务失败。针对这种情况,在研究了 Flink 的内存机制后,得出的解决方案如下:
- 缩短时间范围,根据业务需求,适当减少关联时两条流的时间范围。
- 调整 Managed Memory 大小,可以调整 Managed Memory 占比,适当的缩小其它内存的使用。
- 设置 State 的 TTL 来避免缓存过多的数据。
4、任务中频繁出现 Checkpoint expired before completing 异常,在实际生产环境中,发现有任务会频繁的报这个错误,这个错误是说 Checkpoint 不能顺利完成,因为 Flink 的 Checkpoint 有 Barrier 机制来保证数据的 ExactlyOnce 精确一致性,如果一批数据处理不完,Checkpoint 就完成不了,这块有兴趣的可以去了解一下。导致这个错误原因有多种,不同的问题也有不同的解答,接下来列举一下各场景与解决方案:
Checkpoint 超时时长太短,这个是比较常见,也比较好解决的一种情况。就是 Checkpoint 的超时时长设置的太短了,导致 Checkpoint 还没完成就被报了超时,解决方案就是设置长一点,我们一般根据任务类型,会设置 6 秒-2 分钟不等。
任务有背压,这个也很常见,一个任务内有多个操作,其中一个操作耗时较长影响了整个任务的执行。也会影响 Checkpoint 的完成,这其中涉及到,有兴趣的可以查一下。解决方案是可以从 WebUi 上找到执行缓慢的 Task,具体问题具体分析,解决了就好了。
-
内存不足,先说背景,我们在生产环境中一般使用 rocksdb statebackend,默认会保留全量 Checkpoint。而这种情况下,在遇到有关联、分组统计等使用了 heap statebackend 的任务中,计算的中间结果会缓存到 State 中,State 的内存默认是总内存的 40%,在这种计算中会不太够,从而导致频率的 GC,也影响了 Checkpoint 的执行。解决方案如下:
- 调大 TaskManager 的内存,TaskManager 的内存调大后,其它内存区域也会相应调大。
- 调大 Managed Memory 的内存占比,就是设置 taskmanager.memory.managed.fraction 这个参数,可根据实际情况来,实际生产中最高可调到 90%。这种方法只调大了 ManagedMemory 一块,如果内存资源并不是很充裕时,可以用这种方式。
- 改用增量 Checkpoint,根据实际情况调整 State 的 TTL 时间,并开启增量 Checkpoint。甚至都不用调内存大小,也能解决问题。
5、在 Flink SQL 中使用 if 函数时,一次偶然的发现,在返回 String 时,会按最大长度返回。什么意思呢,比如 if(condition, stringA, stringB),stringA 的长度是 10,stringB 的长度是 2,如果 condition = false,返回 stringB 的时候,会补齐 stringB 的长度到 10,不够的给空格。这是个需要注意的地方。但后续了解到目前该现象已在 1.16.3 版本修复了,而我们用的是 1.15,所以如果遇到了可以用 CaseWhen 替代或者升级 Flink 版本至 1.16.3 及以上即可解决。
五、StarRocks 选型背景及问题
在之前的的框架中我们是以Flink流式处理引擎完成原始日志的清洗,数据的打宽与轻度聚合,再落地到分布式文件系统或者对象存储,通过离线 Spark SQL 五分钟级别的调度批处理,结果会通过 Presto 等引擎去查询,这样的架构在生产环境中渐渐显露出很多问题。
例如:
存在重复计算的问题,原始数据会在不同的任务中反复清洗,有的需要多个原始数据的关联也会反复的清洗,大量浪费了计算资源,代码和数据流可重用性很差。
为了满足离线批处理历史累计值和当前 5 分钟窗口的计算指标,在流量高峰期和当日指标累计到晚上时很可能在 5 分钟之内无法完成指标的计算,有很大的超时风险,业务会反馈实时指标的延迟。
由于离线 Spark 批处理在多维组合分析并且又要求实时性情况下,略显乏力。 业务的在线化,催生出很多实时的场景,另一方面运营的精细化和分析的平民化也催生出多维的分析需求,这些场景下需要粒度特别细,维度特别丰富的底层数据,这两部分的叠加起来就催生出了实时多维分析的场景。这时候我们需要不断的增加维度组合,增加结果字段,增加计算资源来满足以上场景,但是还是略显乏力。
在数据时效性日益增加的今天,很多场景下数据的时效性提出了秒级毫秒级的要求,之前5分钟级别的方式不能满足业务需求。
在之前的实时任务中经常需要在 Flink 内存中做流和流的 Join,这些都需要在 Flink 任务内存中做,由于上游多个数据流的数据到达时间不一致,很难设计合适的 window 去在计算引擎里打宽数据,采用 Flink Interval Join 时多个流时间间隔太久状态数据数据会非常庞大,启用 mapState 之类的状态计算又过于定制。
对于 Flink 清洗或者计算的结果可能需要多个存储介质中,对于明细数据我们可能会存储在分布式文件系统或者对象存储,这时候是 Flink+HDFS,对于业务更新流数据,可能是 Flink CDC+hbase(cassandra或者其他 key-value 数据库),对于 Flink 产生回撤流数据可能是 Flink+MySQL(redis),对于风控类数据或者传统的精细化的看版可能是 Flink+ elasticsearch,对于大批量日志数据指标分析可能是Flink+clickhouse,难以统一,资源大量损耗,维护成本同样高。
在线上有大型活动或者大型节目时,实时数据量暴增,实时的大批量写入的情况下,写入延迟大,写入效率不高,数据积压。
总体分析,早期架构有这样一些问题。
数据源多样,维护成本比较高。
性能不足,写入延迟大,大促的场景会有数据积压,交互式查询体验较差。
各个数据源割裂,无法关联查询,形成众多数据孤岛。然后从开发的角度,每个引擎都需要投入相应的学习开发成本,程序复杂度比较高。
实时性要求高,并且开发效率快,代码或者数据可重复利用性强。
实时任务开发没有同一套标准,各自为战。
为此我们在测试环境下做了简单的性能对比,具体情况如下:
对比环境 StarRocks: 4 *16C*128G Presto: 22*32C*256G (非独占)
数据量:事件表(共百亿数据,日均千万去重用数)
测试用例 | Presto(s) | StarRocks(s) |
---|---|---|
单表聚合测试 | 13.1 | 5 |
关联测试 | 19 | 8 |
留存 | 24 | 15 |
窗口函数 | 16 | 8 |
漏斗 | 3.5 | 3.2 |
多表关联 | 36 | 19 |
本次测试使用了 4 台16C128G 内存的 BE 服务器,测试结论基本能够满足百亿条数据的查询需求。测试结果表明资源在相差很多的情况下,StarRocks 的性能还明显优于 Presto,且平均效率提升 2-3 倍。
六、基于 Flink SQL+StarRocks 实时分析数仓
基于已经搭建完毕的 Flink SQL 的数仓分层体系,且由 StarRocks2.5X 版本升级到 StarRocks3.0X 存算分离版本并已大规模投入在生产环境中。
实时和离线湖仓一体的架构图:
明细模型
在大数据生产环境中最常见的日志数据,特点是数据量大,多维度灵活复杂的计算,计算指标多,实时性强,秒级别的高性能查询,简单稳定实时流写入,大表的 Join,高基数去重。
这些要素对于 Flink SQL+StarRocks 都能满足,首先实时平台上使用 Flink SQL 快速对实时流日志数据进行清洗,打宽,同时 StarRocks 提供 Flink-Connector-StarRocks 连接器开箱即用,并且支持 ExactlyOnce 和事务支持,通过 Stream Load 低延时快速导入。
例如:
通过高效简单的 Flink SQL 建表模式,批量百万级写入,速度快,同时生产环境单表十亿级别以上数据计算多维度用户访问次数,和用户去重数据,能达到秒级别。
主键模型
在 OLAP 数据仓库中,可变数据通常是不受欢迎的。
对于数仓中的数据变更方式:
方式一:一些OLAP数据仓库提供 Merge on Read 模型的更新功能,完成数据变更,例如(clickhouse)。
方式二:简单来说就是创建新分区表,删除老的分区表的数据,然后批量刷写过去。
在新的分区中插入修改后的数据,通过分区交换完成数据变更。
通过批量刷写的方式会要重新建表,删除分区数据,刷写数据过程繁杂,还可能导致出错。
Merge on Read 模式在写入时简单高效,但读取时会消耗大量的资源在版本合并上,同时由于 merge 算子的存在,使得谓词无法下推、索引无法使用,严重的影响了查询的性能。StarRocks 提供了基于 Delete and Insert 模式的主键模型,避免了因为版本合并导致的算子无法下推的问题。主键模型适合需要对数据进行实时更新的场景,可以更好的解决行级别的更新操作,支撑百万级别的 TPS,特别适合 MySQL 或其他业务库同步到 StarRocks 的场景。
而且通过 Flink CDC 和 StarRocks 完美结合可以实现业务库到 OLAP 数据仓库端到端的全量+增量的实时同步,一个任务可以搞定批量和实时的全部问题,并且高效稳定。同时主键模型也可以解决 Flink 中回撤流输出的问题,支持按条件更新,支持按列更新,这些都是传统 OLAP 数据库很多不兼具的优点。
Flink CDC+StarRocks 的模式可以解决生产环境中很多问题, StarRocks 和 Flink 结合去构建实时数据分析体系的联合解决方案,将在一定程度上颠覆既有的一些禁锢,形成实时数据分析新范式,加速融合实时日志数据和业务数据,也能解决传统离线数据批量抽取的问题,实现了离线和实时在数据上的统一,加快流批一体的进程。
聚合模型
在实时数仓中还有一种场景,我们不太关心原始的明细数据,多为汇总类查询,比如 SUM、MAX、MIN 等类型的查询,旧数据更新不频繁,只会追加新的数据,这个时候可以考虑使用聚合模型。建表时,支持定义排序键和指标列,并为指标列指定聚合函数。当多条数据具有相同的排序键时,指标列会进行聚合。在分析统计和汇总数据时,聚合模型能够减少查询时所需要处理的数据,提升查询效率。
在之前我们可能会把这些操作放在 Flink 里面去统计,状态数据会存在在内存中,会导致状态数据持续增长,并且消耗大量资源,将 Flink 的单纯统计修改为 Flink SQL+StarRocks 聚合模型,Flink 这里只需要对明细数据进行清洗并导入到 StarRocks,效率非常高且稳定。
我们在实际生产中主要用来统计用户观看时长,点击量,订单统计等。
物化视图
数据仓库环境中的应用程序经常基于多个大表执行复杂查询,通常涉及多表之间数十亿行数据的关联和聚合。要实现这种实时多表关联并查询结果的方式,在之前我们可能会把此项内容放在 Flink 实时数仓中去处理,分层处理关联,合并,统计等任务,最后输出结果层数据,处理此类查询通常会大量消耗系统资源和时间,造成极高的查询成本。
现在可以考虑使用 Flink SQL+StarRocks 的新思路去处理这种大规模的分层计算问题,使得 Flink SQL 这里只需要处理一些简单清洗任务,把大量重复计算的逻辑下推到 StarRocks 去执行,多个实时流实时落地,在 StarRocks 可以建立多级物化视图的建模方式,StarRocks 的物化视图不仅支持内表和内表关联,也支持内表和外表关联,比如你的数据是在 MySQL,Hudi,Hive 等都可以通过 StarRocks 物化视图的方式查询加速,并设定定期刷新规则,从而避免手动调度关联任务。其中最大的一个特点时,我们已经建立的物化视图,当有新的查询对已构建了物化视图的基表进行查询时,系统自动判断是否可以复用物化视图中的预计算结果处理查询。如果可以复用,系统会直接从相关的物化视图读取预计算结果,以避免重复计算消耗系统资源和时间。查询的频率越高或查询语句越复杂,性能增益就会越很明显。
实时即未来,StarRocks 在逐渐实现这样的能力,StarRocks 和 Flink 结合去构建实时数据分析体系的联合解决方案,将在一定程度上颠覆既有的一些禁锢,形成实时数据分析新范式。
七、未来展望
湖仓一体
当前芒果 TV 已经实现了流批一体的数仓建设,而未来的重点是湖仓一体的建设。
数据湖的特点在于可以存储各种类型和格式的原始数据,包括结构化数据、半结构化数据和非结构化数据。而数据仓库则是对数据进行结构化和整理,以满足特定的业务需求。
湖仓一体将数据仓库和数据湖的特点融合在一起,打造一个统一的数据中心,实现对数据的集中管理。湖仓一体的架构能够提供更好的安全性、成本效益和开放性,既能够存储和管理大量原始数据,又能够将数据整理成结构化的形式,为分析和查询提供便利。
通过建立湖仓一体,芒果 TV 能够向公司内部提供更丰富的数据服务,支持业务决策和创新,实现对数据的全面掌控和管理,包括数据的采集、存储、处理和分析。同时,湖仓一体还能够支持多种计算引擎和工具的使用,如 Flink、Spark、Hive 等,使得数据处理和分析更加灵活和高效。
低代码
现在的开发方式是在自研的平台上写 SQL 提交任务,这种方式在面对一些清洗场景时,大部分是重复工作,有较大的提升空间。低代码是时下比较热门的概念,其在降本增效方面的优势很大。我们的下一步的计划是逐步实现低代码,第一阶段是将实时平台与数据上报平台进行打通,通过读取上报平台里相关元数据,能够自动生成对应的数据清洗任务,解放生产力,提升工作效率与交付速度。
低代码的优势在于它能够将开发过程中的重复工作进行自动化和简化,减少了开发人员的编码工作量。通过可视化的方式,开发人员可以通过拖拽和配置来完成任务,而无需编写大量的代码。这不仅提高了开发效率,还降低了出错的风险。
通过实现低代码的开发方式,芒果 TV 将能够加快数据处理和分析的速度,提高团队的整体效率。此外,低代码还能够降低对开发人员的技术要求,使得更多的人能够参与到数据处理和分析的工作中。
总结而言,基于 Flink 技术的特点,芒果 TV 在未来的数仓建设中将注重实现湖仓一体的架构,以实现对数据的全面管理和利用。同时,芒果 TV 计划逐步实现低代码的开发方式,以提高开发效率和交付速度。这些举措将进一步推动芒果 TV 在长视频数据分析领域的发展,为业务决策和创新提供更强大的支持。