BIGO 使用 Flink 做 OLAP 分析及实时数仓的实践和优化
文|王连平(诨名:烨川 )
首要内容包含:
- 事务布景
- 落地实践 & 特征改善
- 运用场景
- 未来规划
一、事务布景
BIGO 是一家面向海外的以短视频直播事务为主的公司, 现在公司的首要事务包含 BigoLive (全球直播服务),Likee (短视频创造同享渠道),IMO (免费通信工具) 三部分,在全球规模内拥有 4 亿用户。伴随着事务的发展,对数据渠道处理才能的要求也是越来越高,渠道所面临的问题也是日益凸显,接下来将介绍 BIGO 大数据渠道及其所面临的问题。BIGO 大数据渠道的数据流通图如下所示:
用户在 APP,Web 页面上的行为日志数据,以及关系数据库的 Binlog 数据会被同步到 BIGO 大数据渠道音讯行列,以及离线存储体系中,然后通过实时的,离线的数据剖析手法进行核算,以运用于实时引荐、监控、即席查询等运用场景。然而存在以下几个问题:
- OLAP 剖析渠道进口不一致:Presto/Spark 剖析使命进口并存,用户不清楚自己的 SQL 查询适宜哪个引擎履行,盲目挑选,体会不好;别的,用户会在两个进口一起提交相同查询,以更快的获取查询成果,导致资源浪费;
- 离线使命核算时延高,成果产出太慢:典型的如 ABTest 事务,经常核算到下午才核算出成果;
- 各个事务方依据自己的事务场景独立开发运用,实时使命烟囱式的开发,缺少数据分层,数据血缘。
面临以上的问题,BIGO 大数据渠道建造了 OneSQL OLAP 剖析渠道,以及实时数仓。
- 通过 OneSQL OLAP 剖析渠道,一致 OLAP 查询进口,削减用户盲目挑选,提高渠道的资源利用率;
- 通过 Flink 构建实时数仓使命,通过 Kafka/Pulsar 进行数据分层;
- 将部分离线核算慢的使命迁移到 Flink 流式核算使命上,加速核算成果的产出;
别的建造实时核算渠道 Bigoflow 办理这些实时核算使命,建造实时使命的血缘关系。
二、落地实践 & 特征改善
2.1 OneSQL OLAP 剖析渠道实践和优化
OneSQL OLAP 剖析渠道是一个集 Flink、Spark、Presto 于一体的 OLAP 查询剖析引擎。用户提交的 OLAP 查询请求通过 OneSQL 后端转发到不同履行引擎的客户端,然后提交对应的查询请求到不同的集群上履行。其全体架构图如下:
该剖析渠道全体结构从上到下分为进口层、转发层、履行层、资源办理层。为了优化用户体会,削减履行失利的概率,提高各集群的资源利用率,OneSQL OLAP 剖析渠道完成了以下功用:
- 一致查询进口:进口层,用户通过一致的 Hue 查询页面进口以 Hive SQL 语法为标准提交查询;
- 一致查询语法:集 Flink、Spark、Presto 等多种查询引擎于一体,不同查询引擎通过适配 Hive SQL 语法来履行用户的 SQL 查询使命;
- 智能路由:在挑选履行引擎的过程中,会依据历史 SQL 查询履行的状况 (在各引擎上是否履行成功,以及履行耗时),各集群的繁忙状况,以及各引擎对该 SQL 语法的是否兼容,来挑选适宜的引擎提交查询;
- 失利重试:OneSQL 后台会监控 SQL 使命的履行状况,假如 SQL 使命在履行过程中失利,将挑选其他的引擎履行重试提交使命;
如此一来,通过 OneSQL OLAP 剖析渠道,BIGO 大数据渠道完成了 OLAP 剖析进口的一致,削减用户的盲目挑选,一起充分利用各个集群的资源,削减资源空闲状况。
2.1.1 Flink OLAP 剖析体系建造
在 OneSQL 剖析渠道上,Flink 也作为 OLAP 剖析引擎的一部分。Flink OLAP 体系分红两个组成部分:Flink SQL Gateway 和 Flink Session 集群;SQL Gateway 作为 SQL 提交的进口,查询 SQL 通过 Gateway 提交到 Flink Session 集群上履行,一起获取 SQL 履行查询的进展,以及回来查询的成果给客户端。其履行 SQL 查询的流程如下:
首要用户提交过来的 SQL,在 SQL Gateway 进行判别:是否需求将成果耐久化写入到 Hive 表,假如需求,则会先通过 HiveCatalog 的接口创建一个 Hive 表,用于耐久化查询使命的核算成果;之后,使命通过 SQL Gateway 上履行 SQL 解析,设置作业运转的并行度,生成 Pipeline 并提交到 Session 集群上履行。
为了确保整个 Flink OLAP 体系的稳定性,以及高效的履行 SQL 查询,在这个体系中,进行了以下功用增强:
-
稳定性:
- 依据 zookeeper HA 来确保 Flink Session 集群的可靠性,SQL Gateway 监听 Zookeeper 节点,感知 Session 集群;
- 操控查询扫描 Hive 表的数据量,分区个数,以及回来成果数据量,防止 Session 集群的 JobManager,TaskManager 因此呈现 OOM 状况;
-
功用:
- Flink Session 集群预分配资源,削减作业提交后申请资源所需的时刻;
- Flink JobManager 异步解析 Split,Split 边解析使命边履行,削减因为解析 Split 堵塞使命履行的时刻;
- 操控作业提交过程中扫描分区,以及 Split 最大的个数,削减设置使命并行所需求的时刻;
-
Hive SQL 兼容:
针对 Flink 对于 Hive SQL 语法的兼容性进行改善,现在针对 Hive SQL 的兼容性大致为 80%;
-
监控告警:
监控 Flink Session 集群的 JobManager,TaskManager,以及 SQL Gateway 的内存,CPU 运用状况,以及使命的提交状况,一旦呈现问题,及时告警和处理;
2.1.2 OneSQL OLAP 剖析渠道取得的成果
依据以上完成的 OneSQL OLAP 剖析渠道,取得了以下几个收益:
- 一致查询进口,削减用户的盲目挑选,用户履行出错率下降 85.7%,SQL 履行的成功率提高 3%;
- SQL 履行时刻缩短 10%,充分利用了各个集群的资源,削减使命排队等待的时刻;
- Flink 作为 OLAP 剖析引擎的一部分,实时核算集群的资源利用率提高了 15%;
2.2 实时数仓建造和优化
为了提高 BIGO 大数据渠道上某些事务目标的产出功率,以及更好的办理 Flink 实时使命,BIGO 大数据渠道建造了实时核算渠道 Bigoflow,并将部分核算慢的使命迁移到实时核算渠道上,通过 Flink 流式核算的方法来履行,通过音讯行列 Kafka/Pulsar 来进行数据分层,构建实时数仓;在 Bigoflow 上针对实时数仓的使命进行渠道化办理,树立一致的实时使命接入进口,并依据该渠道办理实时使命的元数据,构建实时使命的血缘关系。
2.2.1 建造计划
BIGO 大数据渠道首要依据 Flink + ClickHouse 建造实时数仓,大致计划如下:
依照传统数据仓库的数据分层方法,将数据划分红 ODS、DWD、DWS、ADS 等四层数据:
- ODS 层:依据用户的行为日志,事务日志等作为原始数据,存放于 Kafka/Pulsar 等音讯行列中;
- DWD 层:这部分数据依据用户的 UserId 通过 Flink 使命进行聚合后,形成不同用户的行为明细数据,保存到 Kafka/Pulsar 中;
- DWS 层:用户行为明细的 Kafka 流表与用户 Hive/MySQL 维表进行流维表 JOIN,然后将 JOIN 之后产生的多维明细数据输出到 ClickHouse 表中;
- ADS 层:针对 ClickHouse 中多维明细数据依照不同维度进行汇总,然后运用于不同的事务中。
依照以上计划建造实时数据仓库的过程中,遇到了一些问题:
- 将离线使命转为实时核算使命后,核算逻辑较为杂乱 (多流 JOIN,去重),导致作业状况太大,作业呈现 OOM (内存溢出) 异常或许作业算子背压太大;
- 维表 Join 过程中,明细流表与大维表 Join,维表数据过多,加载到内存后 OOM,作业失利无法运转;
- Flink 将流维表 Join 产生的多维明细数据写入到 ClickHouse,无法确保 Exactly-once,一旦作业呈现 Failover,就会导致数据重复写入。
2.2.2 问题处理 & 优化
优化作业履行逻辑,减小状况
离线的核算使命逻辑较为杂乱,涉及多个 Hive 表之间的 Join 以及去重操作,其大致逻辑如下:
当将离线的作业转为 Flink 的流式使命之后,原先离线 Join 多个 Hive 表的场景就转变为 Join 多个 Kafka Topic 的场景。 因为 Join 的 Kafka topic 的流量较大,且 Join 的窗口时刻较长 (窗口最长的为 1 天),当作业运转一段时刻内,Join 算子上就积累了很多的状况 (一小时后状况就接近 1T),面临如此大的状况,Flink 作业采纳 Rocksdb State Backend 来存放状况数据,可是依然避免不了 Rocksdb 内存运用超越导致被 YARN kill 的问题,或许是 Rocksdb State 上存的状况太多,吞吐下降导致作业严重背压。
针对这个问题,咱们将这多个 Topic,依照相同的 Schema 进行 Unoin all 处理,得到一个大的数据流,然后在这个大的数据流中,再依据不同事情流的 event_id 进行判别,就能知道这条数据来自哪一个事情流的 Topic,再进行聚合核算,获取对应事情流上的核算目标。
这样一来,通过 UNION ALL 代替 JOIN,避免了因为 JOIN 核算带来的大 State 带来的影响。
别的,在核算使命中还存在有比较多的 count distinct 核算,相似如下:
select count(distinct if(events['a'] = 1, postid, null)) as cnt1, count(distinct if(events['b'] = 1, postid, null)) as cnt2
…… count(distinct if(events['x'] = 1, postid, null)) As cntx From table_a Group by uid
这些 count distinct 核算在同一个 group by 中,并依据相同的 postid 进行去重核算,因而可以让这些 distinct state 可以同享一组 key 来进行去重核算,那么就可以通过一个 MapState 来存储这若干个 count distinct 的状况,如下:
这些 count distinct 函数去重的 key 相同,因而可以同享 MapState 中的 key 值,然后优化存储空间;而 Mapstate 的 Value 是 Byte 数组,每个 Byte 8 个 bit,每个 bit 为 0 或许 1,第 n 个 bit 对应了 n 个 count distinct 函数在该 key 上的取值:1 标明该 count disitnct 函数在对应的 key 上需求进行计数,0 标明不需求计数;当核算聚合成果的时候,则将一切 key 第 n 位的数字相加,即为第 n 个 count distinct 的取值,这样一来,就更进一步节约了状况的存储空间。
通过以上优化,成功的将 ABTest 的离线使命迁移到 Flink 流式核算使命上,将作业的状况操控在 100GB 以内,让作业正常的运转起来。
流维表 JOIN 优化
生成多维明细宽表的过程中,需求进行流维表 JOIN, 运用了 Flink Join Hive 维表的功用:Hive 维表的数据会被加载到使命的 HashMap 的内存数据结构中,流表中的数据再依据 Join Key 与 HashMap 中的数据进行 Join。可是面临上亿,十亿行的 Hive 大维表,加载到内存的数据量太大,很容易导致 OOM (内存溢出)。针对以上问题,咱们将 Hive 大维表依照 Join Key 进行 Hash 分片,如下图:
这样一来,Hive 大维表的数据通过 Hash 函数核算后散布到 Flink 作业的不同并行子使命的 HashMap 中,每个 HashMap 只存扩大维表的一部分数据,只要作业的并行度够大,就可以将大维表的数据拆分红足够多份,进行分片保存;对于一些太大的维表,也可以采纳 Rocksdb Map State 来保存分片数据。
Kafka 流表中的数据,当要下发到不同的 subtask 上进行 Join 时,也通过相同的 Join Key 依照相同的 Hash 函数进行核算,然后将数据分配到对应的 subtask 进行 Join,输出 Join 后的成果。
通过以上优化,成功 Join 了一些 Hive 大维表使命来履行流维表 Join 核算,最大的维表超越 10 亿行。
ClickHouse Sink 的 Exactly-Once 语义支撑
将流维表 Join 生成的多维明细数据输出到 ClickHouse 表的过程中,因为社区的 ClickHouse 不支撑事务,所以没办法确保数据 sink 到 ClickHouse 过程中的 Exactly-Once 语义。在此过程中,一旦呈现作业 Failover,数据就会重复写入到 ClickHouse。
针对这个问题,BIGO ClickHouse 完成了一个二阶段提交事务机制:当需求写入数据到 ClickHouse 时,可以先设置写入的模式为 temporary,标明现在写入的数据是暂时数据;当数据履行刺进完成后,回来一个 Insert id,然后依据该 Insert id 履行 Commit 操作,那么暂时数据就转为正式数据。
依据 BIGO ClickHouse 的二阶段提交事务机制,并结合 Flink 的 checkpoint 机制,完成了一个 ClickHouse Connector,确保 ClickHouse Sink 的 Exactly Once 写入语义,如下:
- 在正常写入的状况下,Connector 随机挑选 ClickHouse 的某一个 shard 写入,依据用户配置写单副本,或许双副原本履行 insert 操作,并记录写入后的 insert id;在两次 checkpoint 之间就会有屡次这种 insert 操作,然后产生多个 insert id,当 checkpoint 完成时,再将这些 insert id 批量提交,将暂时数据转为正式数据,即完成了两次 checkpoint 间数据的写入;
- 一旦作业呈现 Failover,Flink 作业 Failover 重启完成后,将从最近一次完成的 checkpoint 来康复状况,此刻 ClickHouse Sink 中的 Operator State 可能会包含上一次还没有来得及提交完成的 Insert id,针对这些 insert id 进行重试提交;针对那些数据已经写入 ClickHouse 中之后,可是 insert id 并没有记录到 Opeator State 中的数据,因为是暂时数据,在 ClickHouse 中并不会被查询到,一段时刻后,将会由 ClickHouse 的过期整理机制,被整理掉,然后确保了状况回滚到上一次 checkpoint 之后,数据不会重复。
通过以上机制,成功确保了数据从 Kafka 通过 Flink 核算后写入到 ClickHouse 整个链路中端到端的 Exactly-Once 语义,数据不重复也不丢掉。
2.2.3 渠道建造
为了更好的办理 BIGO 大数据渠道的实时核算使命,公司内部建造了 BIGO 实时核算渠道 Bigoflow,为用户供给一致的 Flink实时使命接入,渠道建造如下:
- 支撑 Flink JAR、SQL、Python 等多种类型作业;支撑不同的 Flink 版本,掩盖公司内部大部分实时核算相关事务;
- 一站式办理:集作业开发、提交、运转、历史展现、监控、告警于一体,便于随时查看作业的运转状况和发现问题;
- 血缘关系:便利查询每个作业的数据源、数据意图、数据核算的来龙去脉。
三、运用场景
3.1 Onesql OLAP 剖析渠道运用场景
Onesql OLAP 剖析渠道在公司内部的运用场景是:运用于 AdHoc 查询,如下:
用户通过 Hue 页面提交的 SQL,通过 OneSQL 后端转发给 Flink SQL Gateway,并提交到 Flink Session 集群上履行查询使命,Flink SQL Gateway 获取查询使命的履行进展回来给 Hue 页面,并回来查询成果。
3.2 实时数据仓库运用场景
实时数据仓库运用场景现在首要是 ABTest 事务,如下:
用户的原始行为日志数据通过 Flink 使命聚合后生成用户明细数据,然后与维表数据进行流维表 JOIN,输出到 ClickHouse 生成多维明细宽表,依照不同维度汇总后,运用于不同的事务。通过改造 ABTest 事务,将该事务的成果目标的生成时刻提前了 8 个小时,一起削减了运用资源一倍以上。
四、未来规划
为了更好的建造 OneSQL OLAP 剖析渠道以及 BIGO 实时数据仓库,实时核算渠道的规划如下:
- 完善 Flink OLAP 剖析渠道,完善 Hive SQL 语法支撑,以及处理核算过程中呈现的 JOIN 数据倾斜问题;
- 完善实时数仓建造,引入数据湖技能,处理实时数仓中使命数据的可重跑回溯规模小的问题;
- 依据 Flink 打造流批一体的数据核算渠道。
我有话说: