Kafka 跨集群数据同步是指在两个或多个不同的 Kafka 集群之间同步数据的过程,在容灾、跨云同步等场景下被广泛使用。它的主要用途是在多个数据中心、地理位置或集群之间实现数据同步和冗余,可以提升数据的高可用性和容灾能力。
目前在跨集群数据同步时,一般使用开源 Kafka 自带的 MirrorMaker 工具。但是 MirrorMaker 存在部署、监控、告警、运维成本较高,以及可扩展性较差的问题。
为了解决上述问题,我们在火山 Flink 上支持了 Kafka 跨集群数据同步,通过简单的 SQL 代码配置,就可以开始进行跨集群多 topic 的数据同步。同时,火山 Flink 是全托管一站式实时计算平台,提供全托管 Flink 服务,操作简单、开箱即用免运维,使用和运维成本很低。
我们对 MirrorMaker 和火山 Flink 在跨集群数据同步场景进行了对比:
功能点 | MirrorMaker | 火山 Flink |
---|---|---|
部署成本 | 部署成本很高,在云上需要基于 ECS 自建一套 MirrorMaker。 | 部署成本极低,火山 Flink 提供全托管 Flink 服务,开箱即用。 |
监控 | 监控难度大,如果要监控 MirrorMaker 的指标,则需要搭建 Prometheus + Grafana,成本较高,而且不便于维护。 | 基于火山云监控提供全托管的监控服务,用户可以在 Flink 作业的数据曲线查看作业运行状况的监控。 |
告警 | 缺少告警能力,MirrorMaker 不支持发送告警,导致在数据延迟时,或 MirrorMaker 出现故障或退出时,无法及时给运维人员推送报警。 | 基于火山云监控提供全托管的告警服务,用户可以在告警配置里,针对作业的各个维度的告警,包括 Kafka 业务延迟、运行状态、Checkpoint 等维度。 |
运维 | 运维成本高,如果 MirrorMaker 出现故障,可能需要手动重启,有一定的运维成本。 | 运维成本低,火山 Flink 本身具有 failover 机制,在数据同步出现异常的情况下,会通过 failover 机制自动恢复,无需人工运维。 |
可扩展性 | 可扩展性较差,在单机资源无法满足跨集群实时同步需求时,需要基于 ECS 来搭建多节点的 MirrorMaker 集群,搭建过程比较繁琐,同时,如果后续还有扩容需求,则需要手动新开节点,并添加到原有集群,无法做到资源的弹性扩容能力。 | 可扩展性高,火山 Flink 自定义指定并发度,在同步数据量增大的情况下,通过扩并发快速提高数据同步能力,其中资源池扩容分钟级别,作业改并发重启分钟级别。 |
易用性 | 易用性较差,需要基于 Shell 脚本部署和运维。 | 易用性高,火山 Flink 提供可视化的开发界面,开发和运维更加便捷。 |
客户支持 | 开源产品,需要自己维护。如果使用过程中出现问题,需要反馈给开源社区,修复周期较长。 | 火山 Flink 迭代更快,bugfix 修复速度更快,稳定性更好。 |
为保证网络访问安全,本文所使用的 Kafka 资源需要通过公网或者专线等方式进行网络联通。
您可以在创建云资源前,先创建私有网络。相关文档,请参见创建私有网络和创建子网。使用 Flink 任务访问外网/跨 VPC 访问方式请参考 Flink 访问公网。
新建 Flink Stream SQL 任务,并选择 Flink 1.16-volcano 版本,在任务编辑区编写 SQL,模版如下所示。
该 SQL 主要逻辑是先定义单个 Kafka Source 读取上游 Kafka 集群的所有所需的 topics,采用 raw 格式分别读取原始消息的 key 和 value,再定义单个 Kafka Sink 把原始消息的 key 和 value 写入到下游 Kafka 集群对应的 topic 中。topic 路由是通过 Kafka Source 和 Sink 的元数据字段 topic 来实现的,Source 获取 topic 元数据,Sink 写入时,根据该元数据,路由到指定的 topic。
为了确保上下游分区在同步时一一对应,确保消息的有序性和正确性,需要设置并发数为所有 topic 的最大分区数的整数倍,比如 topic-1 分区数为 12,topic-2 分区数为 24,则设置作业并发为 24,或者 24 的整数倍。
CREATE TABLE kafka_source ( `topic` STRING METADATA, -- 定义 topic 元数据,用于提供给下游根据 topic name 写入对应的 topic `msg_key` VARBINARY, -- 定义消息 key,这次采用 raw 格式读取原始数据,字段类型需要为 VARBINARY `msg_value` VARBINARY -- 定义消息 value,这次采用 raw 格式读取原始数据,字段类型需要为 VARBINARY ) WITH ( 'connector' = 'kafka', 'topic' = 'topic1;topic2', -- 指定要同步的上游 topic 列表 'properties.bootstrap.servers' = 'kafka-xxx-cluster-1.kafka.ivolces.com:9092', 'properties.group.id' = 'test', 'key.format' = 'raw', -- 采用 raw 格式分别读取原始消息的 key 和 value 'key.fields' = 'msg_key', 'value.format' = 'raw', 'value.fields-include' = 'EXCEPT_KEY', 'scan.startup.mode' = 'latest-offset' ); CREATE TABLE kafka_sink ( `topic` STRING METADATA, -- 定义 topic 元数据,使得可以根据 topic name 写入对应的 topic `msg_key` VARBINARY, -- 定义消息 key,这次采用 raw 格式写入原始数据,字段类型需要为 VARBINARY `msg_value` VARBINARY -- 定义消息 value,这次采用 raw 格式写入原始数据,字段类型需要为 VARBINARY ) WITH ( 'connector' = 'kafka', 'topic' = 'topic1;topic2', -- 指定要写入的 topic 列表 'properties.bootstrap.servers' = 'kafka-xxx-cluster-2.kafka.ivolces.com:9092', 'key.format' = 'raw', -- 采用 raw 格式分别写入原始消息的 key 和 value 'key.fields' = 'msg_key', 'value.format' = 'raw', 'value.fields-include' = 'EXCEPT_KEY', 'properties.enable.idempotence' = 'false', --关闭事务写 'properties.linger.ms' = '500', -- 攒批时间 500ms,减小写入压力 'properties.batch.size' = '5242880', -- 攒批大小 5MB 'properties.retries' = '10', -- 设置重试次数为 10,提高稳定性 'properties.send.buffer.bytes' = '5242880', 'properties.max.in.flight.requests.per.connection' = '150', 'properties.request.timeout.ms' = '120000', 'properties.buffer.memory' = '67108864', 'properties.delivery.timeout.ms' = '3000000', 'sink.partitioner' = 'fixed' -- 使用固定分区策略,确保每个 Flink 并发写入对应的 Kafka topic 分区 ); -- 上游 topic 原样同步到下游同名的 topic INSERT INTO kafka_sink SELECT topic, msg_key, msg_value FROM kafka_source;
需要注意的是,Source 和 Sink 除了 topic 可以定义多个 topic 列表外,还支持 topicPattern,来使用正则表达式匹配所需的 topic,参数说明如下:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
topic | 可选 | (无) | String | 当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 |
topic-pattern | 可选 | (无) | String | 用于读取或写入的 topic 名称模式的正则表达式。所有匹配指定正则表达式的 topic 名称将在作业开始运行时被消费者订阅。对于 sink 来说, |
如果在 Kafka Topic 数据跨集群同步过程中,需要调整 Topic 的名称,可以参考如下 SQL 示例对于 INSERT 语句进行调整:
-- 上游 topic 和下游 topic 有映射规则,比如上游 topic-1 映射到下游的 prefix-topic-1 -- 需要修改的地方为: -- 1. Kafka sink topic 列表改为 'topic' = 'prefix-test-1;test-2-suffix;test-3' -- 2. 采用 CASE WHEN 来定义上下游 topic 的映射规则 INSERT INTO Kafka_sink SELECT CASE WHEN topic = 'topic-1' THEN 'prefix-topic-1' -- 对 topic-1 添加前缀,写入到下游集群的 topic 名称是 prefix-topic-1 WHEN topic = 'topic-2' THEN concat('topic-2', '-suffix') -- 对 topic-2 添加后缀,写入到下游集群的 topic 名称是 topic-2-suffix ELSE topic END AS topic, -- 其余的 topic 名称原样同步到下游同名的 topic msg_key, msg_value FROM Kafka_source;
系统会提示任务上线成功,可以前往任务管理页面查看。
在上游 topic 扩分区场景下,需要先停止 Flink 同步作业,给下游对应 topic 扩分区,完成后,给 Flink 作业扩并发为当前最大的 topic 分区数后,从最新状态重启 Flink 作业。重启完成后,Flink 作业能一一映射上下游分区。
当前下游写入采用固定分区策略,如果业务上不要求同步时,分区是一一对应的,那么下游写入可以选择使用默认分区策略。默认分区策略对没有消息键的消息使用粘性分区策略(sticky partition strategy)进行分区,对含有消息键的消息使用 murmur2 哈希算法计算分区。
如果上游 topic 分区不要求一一对应,且 topic 分区有倾斜,推荐使用默认分区策略,使得下游 topic 各分区数据是均匀的。默认分区策略设置方法为,在 Kafka Sink DDL 里,去掉 'sink.partitioner' 相关的配置,则默认就会使用该分区策略。
如果当前您正在使用 MirrorMaker 进行 Kafka 的数据流量同步,如果您想要稳定的迁移到火山 Flink,可以参考如下的步骤:
建议:建议 MirroMaker 和 Flink SQL 使用同样的消费者组。并且 Flink SQL 中设置 Kafka 的启动模式为
'scan.startup.mode' = 'group-offset'
。这样可以保证 Flink SQL 从 MirroMaker 消费的 Offset 继续进行消费。
在 MirrorMaker 迁移火山 Flink 时,可以采用常用的灰度发布策略。灰度发布允许我们将迁移过程分为多个阶段,逐步将流量引导到新系统或新的 Kafka 集群中,以最小化风险和潜在的影响。
具体是先把少量 topic 切换到火山 Flink 进行同步。观察一段时间,确认稳定性、性能满足要求后,逐步增加灰度的 topic 范围。建议的灰度比例为 5% -> 20% -> 50% -> 100%,逐步增加迁移比例。
火山 Flink 支持在给 Source 的 topic list 增加新的 topic,从最新状态恢复时,作业的存量 topic 从历史状态恢复,新增的 topic 从 SQL 配置的参数启动,比如 SQL 里配置了 group-offset 模式,则从该模式启动。
监控指标:
在迁移过程中如果发现问题,需要快速回滚到 MirrorMaker。具体回滚步骤:
流量通过灰度发布的方式,逐步把所有 topic 的流量都切换到火山 Flink。
根据实际的性能测试,在消息大小为每条 1KB 的情况下,火山 Flink 单 CU 数据处理能力为 2w QPS,单 CU 数据速率为 20MB/s。测试过程中,作业运行稳定,上下游 Kafka 集群运行稳定。