FlinkCDC

Flink CDC 2.0 无锁全量 Dump 数据技术实现 - 掘金 (juejin.cn)

Flink CDC简介

什么是CDC

CDC(Change Data Capture)它是捕获数据变更技术。在广义上,只要能捕获数据变更的技术,我们都可以成为CDC。通常我们说的CDC技术主要是面向数据库的变更,是一种用于捕获数据库中数据变更的技术。

CDC的实现方式:

  • 基于查询

    • 优点:实现相对简单,无需特别的配置和权限
    • 缺点:实时性基于调度频率,对DB压力较大,无法保证数据的一致性
  • 基于日志

    • 优点:实时消费变更日志,对DB压力较小,保证数据的一致性
    • 缺点:实现复杂,需要数据配置和账号权限

Flink CDC

Flink CDC Connerctors是Flink的一组Source连接器,是Flink CDC的核心组件,这些连接器负责从MySQL、PostgreSQL等数据库读取历史数据和增量变更。Flink CDC Connectors 是一个独立的开源项目,

避免重新拉取全量

概念上讲,Flink的Savepoints与CheckPoint的区别类似于传统数据库中备份与恢复日志的区别。

CheckPoint的主要目的是在以外作业失败时提供恢复机制。CheckPoints的生命周期由Flink管理,即CheckPoints由Flink创建拥有和释放,无需用户交互。用于Checkpoints经常被处罚,并且在故障恢复中起着重要作用,Checkpoints实现的俩个主要涉及目标是 1.创建轻量级2.从恢复中的速度尽可能快。针对这些目标的优化可以利用某些属性,例如作业代码在执行尝试之间不会发生变化。

如果用户终止应用程序(除非明确配置保留Checkpoints),checkpoints江北自动删除。

Checkpoints一状态后端特定(本地)数据格式存储(根据具体的后端,可能是增量的)。

尽管SavePoints是使用Checkpoints相同的内部机制创建的,但它们在概念上是不同的,并且在创建和回复方面可能稍微更昂贵。它们的设计更加注重数据的可移植性和灵活性,也别是对作业更改的处理。

SavePoint是用户计划的,手动的操作。仅由用户创建拥有和删除。这意味着。Flink在作业终止回复后不会删除Savepoint。

savepoints以状态后端无关的规范的格式存储

示例

假设你有一个实时的电商应用程序,用于处理用户的订单数据。你希望在应用程序处理订单时定期创建Checkpoints,以便在发生故障时能够恢复到最近的一致状态。每隔10分钟,Flink会自动创建一个Checkpoints,并将应用程序的状态(包括读取位置和处理状态)存储在可靠的存储系统中(如分布式文件系统)。

现在,假设你要对该应用程序进行升级,引入一个新的优化算法来提高订单处理的效率。在进行升级之前,你希望先手动创建一个Savepoint,以便在升级后能够回到应用程序的先前状态。你手动触发了一个Savepoint操作,并将应用程序的当前状态存储在分布式文件系统中的特定目录中。

然后,你可以进行应用程序的升级操作,修改算法并重新部署应用程序。一旦升级完成,你可以使用之前创建的Savepoint来恢复应用程序的状态。Flink会加载Savepoint,并从中恢复应用程序的读取位置和处理状态,以便继续处理订单数据。这样,你可以在应用程序的升级过程中保持数据一致性,避免数据丢失或重复处理的问题。

总结

  • Checkpoints用于定期创建应用程序的一致状态快照,以便在故障发生时进行恢复。
  • Savepoints则用于手动创建应用程序的快照,以便在需要进行升级,更改或重新部署时能够恢复到先前的状态。

全量无锁Dump数据技术实现

mysqlCDC

MySQL CDC 的底层是基于 debezium 来实现的,它的 1.x 版本完全基于 debezium,2.x 版本则重新实现了 全量阶段 的同步逻辑。1.x 完全基于 debezium 在全量阶段就会出现以下三个痛点:

  1. debezium 在 snapshot 阶段的一致性通过对表加锁来实现,全局锁可能导致数据库被 hang 住,debezium 需要锁权限。这是个高危操作。
  2. 只支持单并发,表特别大的时候,snapshot 阶段的完成时间会是小时级别。
  3. snapshot 阶段不支持 checkpoint,也就意味着,不能中途停下来,否则又要重新 snapshot 一遍数据。

debezium 通过对 db 加锁,来实现在开始阶段和结束阶段的 binlog position 一致性,保证全量阶段和增量阶段的无缝衔接。

为了解决上面的三个痛点。Flink CDC 2.0 借鉴了 Netflix 的 DBlog paper1 和 FLINK FLIP-27 Source2 的实现,重新设计了全量阶段的处理逻辑。

FLINK FLIP-27 Source

FLIP-27 主要实现了 Source 可以按照并行度进行分片,然后并行消费数据。这就带来了极好的扩展性。它的实现由两个组件构成:

  • SplitEnumerator:发现和分配 splits(files, partitions 等等)
  • Reader: 从 splits 中读取实际的数据

其中 SplitEnumerator 是单线程分片处理器,它将 split 信息分配给空闲的 Reader 去执行,Reader 执行完后,会通知 SplitEnumerator 当前 Reader 已空闲,SplitEnumerator 再将下一个 split 给到空闲的 Reader。它的整体流程如下图所示:

SplitEnumerator 维护 splits 状态,如待分配列表、已分配但未消费完成列表,消费完成列表等,用于从 checkpoint 中重新恢复未完成的 splits 的分配工作。而 Reader 维护每个 Split 的实际消费状态,如 split 消费到的 offset 信息等。

FLIP-27 是专门为分片并行读取而设计的 source 调度框架,解决了数据 Reader 的水平扩展 的问题和读取时的 checkpoint 问题

Flink CDC2.0算法

flink CDC1.0通过开启可重复读事务,实现dump数据。通过表锁或全局锁,保证数据读取时能记下准确的binlog位置。

那在无锁的情况下,别的client就会在snapshot阶段更新某一条被同步过去的数据,造成在snapshort阶段同步过去的数据是错误的,而增量阶段又未同步到这个变更,这条数据就会一直错下去,直到下一次变更。

Netflix 的 DBLog: A Watermark Based Change-Data-Capture Framework 提供了一种解法,在无锁的情况下,实现数据的一致性问题。示例代码如下图所求:

它通过第三方 watermark 表来记录执行 select 前和后的高低水位(binlog 位置),然后读取该 chunk 的数据,再将 lwhw 之前属于 chunk 区间的数据进行更新。

如上图示例可看,在 chunk: k1~ k6 中,k1 和 k3 在 select 的时候有被更新过,于是 k1 和 k3 使用 binlog 中的数据向后 emit,select 出来的数据被抛弃,而 k2,k4,k5,k6 直接向后 emit。

Flink CDC 2.0 在实现上,对该算法做了优化,它去掉了 watermark 表方案,而是直接记录 binlog 位置来打点。使用当 show master status 获取执行 select chunk 前后的高低水位。

在MYSQL的主从复制中 ,通过命令show master status,可以查看master数据库当前正在使用的二进制日志及当前执行二进制日志位置

全增量数据处理流程

结合 FLIP-27 和 DBlog paper,表订阅首先在 JobManager 上对数据进行分片,生成 snapshot split 集合,然后 reader 就绪后,向 JM 申请消费 split,JM 发送消费的区间给到 Reader。当 Reader 完成 split 的消费之后,split 会存下当前的高水位信息,并将 split 以事件的形态回传给 JM 做状态记录。Splits 先在 JM 的待分配列表中,当分配给 Reader 消费后,会进行已分配列表,当状态回传后,split 会记录到已完成列表。

假设有一张 kv 表,共 40 条数据,每个分片取 10 条数据,它的某个状态如下所示:

当所有的分片数据消费完成后,JM 会等到 checkpoint 周期到达时(当分片只有一个时,会直接分配 binlog split),生成一个 binlog split 给到空闲的 reader 去做增量数据订阅。该 split 包含之前的所有 snapshot split 数据,并且开始订阅 binlog 的位置由所有 snapshot split 的最低 hw 决定。这里复用 kv 表 的示例说明,binlog split 由 _start_offset = min(hw1, hw2, hw3, hw4)_确定。

当发起增量数据订阅时,binlog 数据可能被处理了,也可能没被处理。比如,当拉取到一个 k23 的值变更时,程序首先会去判断当前 k23 所属的 binlog offset 是否已经大于 hw1~hw4 的最大值,如果是,说是当前已完成进入增量阶段,全量数据已完全和 db 一直。否则,判断新的 k23 的 offset 是否在 k21-k30 这个分片的 hw3 之后,如果是,则说明在完成这个分片消费后,k23 发生了变更,需要将这个变更同步给下游,让数据最终一致。如果 k23 的 binlog offset 小于 hw3,说明数据更新已在 snapshot 阶段完成,可以直接丢弃。

GTID和心跳

在实际的业务中,我们通过 Flink Join 生成大宽表,往往需要订阅一些不常更新的表,如后台管理员表。debezium 低层的订阅过滤逻辑会将非订阅表的数据丢弃处理,这会导致 flink cdc 的 state 得不到更新,在下次服务重启时,可能会拿一个已被 purge 的 binlog 位置,去重新申请数据订阅,导致服务已法正常重启。

好在,Flink CDC 提供了心跳机制,会定期将非订阅表的事件包装成心跳,去更新 state 的 binlog offset,以保证 state 数据的有效性。

之所以需要心跳机制,这是由于我们的 binlog 数据可能会被定期 purge,所以,需要我们的 state 随时保持在最新的位置上。Flink CDC 在重新恢复订阅时,会做如下逻辑处理,来判断当前 state 的 binlog 数据是否为有效的,否则就需要人工介入:

第一步:A = Checkpoint 中的 gtid。

第二步:B = 执行 show master status 获取变量 gtid_executed 的 gtid_set。

第三步:通过执行 MySQL 的 GTID_SUBSET(A, B) 判断 A 是否在 B 内,在则继续;否则 重启中断,需人工介入。

第四步:C = GTID_SUBSTRACT(B, A) 得到差集,即 B 未同步到 A 的部分。p.s. 方法 GTID_SUBSTRACT(set, subset) 返回 set 不在 subset 的 GTIDs

第五步:D = 执行 select @@global.gtid_purged 获取 gtid_purged 的 gtid_set。 第六步:通过 E = GTID_SUBSTRACT(C, D) 得到。 第七步:判断 C 和 E 是否相等:相等则正常重启,否则重启中断,需人工介入。

上图为 gtid 合法性校验,左边为正常,右边为异常情况。

Flink CDC 中 heartbeat.interval 默认是开启状态,如果不需要,也可以设置成 0s 让该功能不可用。该功能随 CDC 2.2.0 一起发布。

Last modification:January 8, 2024
如果觉得我的文章对你有用,请随意赞赏