You need to enable JavaScript to run this app.
导航
Kafka 跨云/集群同步
最近更新时间:2024.12.05 13:36:58首次发布时间:2024.11.15 15:20:02

场景介绍

Kafka 跨集群数据同步是指在两个或多个不同的 Kafka 集群之间同步数据的过程,在容灾、跨云同步等场景下被广泛使用。它的主要用途是在多个数据中心、地理位置或集群之间实现数据同步和冗余,可以提升数据的高可用性和容灾能力。

目前在跨集群数据同步时,一般使用开源 Kafka 自带的 MirrorMaker 工具。但是 MirrorMaker 存在部署、监控、告警、运维成本较高,以及可扩展性较差的问题。
为了解决上述问题,我们在火山 Flink 上支持了 Kafka 跨集群数据同步,通过简单的 SQL 代码配置,就可以开始进行跨集群多 topic 的数据同步。同时,火山 Flink 是全托管一站式实时计算平台,提供全托管 Flink 服务,操作简单、开箱即用免运维,使用和运维成本很低。
我们对 MirrorMaker 和火山 Flink 在跨集群数据同步场景进行了对比:

功能点

MirrorMaker

火山 Flink

部署成本

部署成本很高,在云上需要基于 ECS 自建一套 MirrorMaker。

部署成本极低,火山 Flink 提供全托管 Flink 服务,开箱即用。

监控

监控难度大,如果要监控 MirrorMaker 的指标,则需要搭建 Prometheus + Grafana,成本较高,而且不便于维护。

基于火山云监控提供全托管的监控服务,用户可以在 Flink 作业的数据曲线查看作业运行状况的监控。

告警

缺少告警能力,MirrorMaker 不支持发送告警,导致在数据延迟时,或 MirrorMaker 出现故障或退出时,无法及时给运维人员推送报警。

基于火山云监控提供全托管的告警服务,用户可以在告警配置里,针对作业的各个维度的告警,包括 Kafka 业务延迟、运行状态、Checkpoint 等维度。

运维

运维成本高,如果 MirrorMaker 出现故障,可能需要手动重启,有一定的运维成本。

运维成本低,火山 Flink 本身具有 failover 机制,在数据同步出现异常的情况下,会通过 failover 机制自动恢复,无需人工运维。

可扩展性

可扩展性较差,在单机资源无法满足跨集群实时同步需求时,需要基于 ECS 来搭建多节点的 MirrorMaker 集群,搭建过程比较繁琐,同时,如果后续还有扩容需求,则需要手动新开节点,并添加到原有集群,无法做到资源的弹性扩容能力。

可扩展性高,火山 Flink 自定义指定并发度,在同步数据量增大的情况下,通过扩并发快速提高数据同步能力,其中资源池扩容分钟级别,作业改并发重启分钟级别。

易用性

易用性较差,需要基于 Shell 脚本部署和运维。

易用性高,火山 Flink 提供可视化的开发界面,开发和运维更加便捷。

客户支持

开源产品,需要自己维护。如果使用过程中出现问题,需要反馈给开源社区,修复周期较长。

火山 Flink 迭代更快,bugfix 修复速度更快,稳定性更好。

前提条件

为保证网络访问安全,本文所使用的 Kafka 资源需要通过公网或者专线等方式进行网络联通。
您可以在创建云资源前,先创建私有网络。相关文档,请参见创建私有网络创建子网。使用 Flink 任务访问外网/跨 VPC 访问方式请参考 Flink 访问公网

新建 Flink Stream SQL 任务,并选择 Flink 1.16-volcano 版本,在任务编辑区编写 SQL,模版如下所示。
该 SQL 主要逻辑是先定义单个 Kafka Source 读取上游 Kafka 集群的所有所需的 topics,采用 raw 格式分别读取原始消息的 key 和 value,再定义单个 Kafka Sink 把原始消息的 key 和 value 写入到下游 Kafka 集群对应的 topic 中。topic 路由是通过 Kafka Source 和 Sink 的元数据字段 topic 来实现的,Source 获取 topic 元数据,Sink 写入时,根据该元数据,路由到指定的 topic。
为了确保上下游分区在同步时一一对应,确保消息的有序性和正确性,需要设置并发数为所有 topic 的最大分区数的整数倍,比如 topic-1 分区数为 12,topic-2 分区数为 24,则设置作业并发为 24,或者 24 的整数倍。

CREATE TABLE
  kafka_source (
    `topic` STRING METADATA, -- 定义 topic 元数据,用于提供给下游根据 topic name 写入对应的 topic
    `msg_key` VARBINARY, -- 定义消息 key,这次采用 raw 格式读取原始数据,字段类型需要为 VARBINARY
    `msg_value` VARBINARY -- 定义消息 value,这次采用 raw 格式读取原始数据,字段类型需要为 VARBINARY
  )
WITH
  (
    'connector' = 'kafka',
    'topic' = 'topic1;topic2', -- 指定要同步的上游 topic 列表
    'properties.bootstrap.servers' = 'kafka-xxx-cluster-1.kafka.ivolces.com:9092',
    'properties.group.id' = 'test',
    'key.format' = 'raw', -- 采用 raw 格式分别读取原始消息的 key 和 value
    'key.fields' = 'msg_key',
    'value.format' = 'raw',
    'value.fields-include' = 'EXCEPT_KEY',
    'scan.startup.mode' = 'latest-offset'
  );

CREATE TABLE
  kafka_sink (
    `topic` STRING METADATA, -- 定义 topic 元数据,使得可以根据 topic name 写入对应的 topic
    `msg_key` VARBINARY, -- 定义消息 key,这次采用 raw 格式写入原始数据,字段类型需要为 VARBINARY
    `msg_value` VARBINARY -- 定义消息 value,这次采用 raw 格式写入原始数据,字段类型需要为 VARBINARY
  )
WITH
  (
    'connector' = 'kafka',
    'topic' = 'topic1;topic2', -- 指定要写入的 topic 列表
    'properties.bootstrap.servers' = 'kafka-xxx-cluster-2.kafka.ivolces.com:9092',
    'key.format' = 'raw', -- 采用 raw 格式分别写入原始消息的 key 和 value
    'key.fields' = 'msg_key',
    'value.format' = 'raw',
    'value.fields-include' = 'EXCEPT_KEY',
    'properties.enable.idempotence' = 'false', --关闭事务写
    'properties.linger.ms' = '500', -- 攒批时间 500ms,减小写入压力
    'properties.batch.size' = '5242880', -- 攒批大小 5MB
    'properties.retries' = '10', -- 设置重试次数为 10,提高稳定性
    'properties.send.buffer.bytes' = '5242880',
    'properties.max.in.flight.requests.per.connection' = '150',
    'properties.request.timeout.ms' = '120000',
    'properties.buffer.memory' = '67108864',
    'properties.delivery.timeout.ms' = '3000000',
    'sink.partitioner' = 'fixed' -- 使用固定分区策略,确保每个 Flink 并发写入对应的 Kafka topic 分区
  );

-- 上游 topic 原样同步到下游同名的 topic
INSERT INTO
  kafka_sink
SELECT
  topic,
  msg_key,
  msg_value
FROM
  kafka_source;

需要注意的是,Source 和 Sink 除了 topic 可以定义多个 topic 列表外,还支持 topicPattern,来使用正则表达式匹配所需的 topic,参数说明如下:

参数

是否必选

默认值

数据类型

描述

topic

可选

(无)

String

当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 'topic-1;topic-2' 来作为 source 的 topic 列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 的 topic 列表。提供的 topic 列表被视为 topic 元数据列的有效值的允许列表。如果提供了列表,对于 sink 表,“topic”元数据列是可写的并且必须指定。

topic-pattern

可选

(无)

String

用于读取或写入的 topic 名称模式的正则表达式。所有匹配指定正则表达式的 topic 名称将在作业开始运行时被消费者订阅。对于 sink 来说,topic 元数据列是可写的,必须提供并且与 topic-pattern 正则表达式匹配。注意,“topic-pattern”和“topic”只能指定其中一个。

步骤二:设置上下游 Topic 映射关系(按需)

如果在 Kafka Topic 数据跨集群同步过程中,需要调整 Topic 的名称,可以参考如下 SQL 示例对于 INSERT 语句进行调整:

-- 上游 topic 和下游 topic 有映射规则,比如上游 topic-1 映射到下游的 prefix-topic-1
-- 需要修改的地方为:
--     1. Kafka sink topic 列表改为 'topic' = 'prefix-test-1;test-2-suffix;test-3'
--     2. 采用 CASE WHEN 来定义上下游 topic 的映射规则
INSERT INTO
  Kafka_sink
SELECT
    CASE WHEN topic = 'topic-1' THEN 'prefix-topic-1' -- 对 topic-1 添加前缀,写入到下游集群的 topic 名称是 prefix-topic-1
         WHEN topic = 'topic-2' THEN concat('topic-2', '-suffix') -- 对 topic-2 添加后缀,写入到下游集群的 topic 名称是 topic-2-suffix
         ELSE topic END AS topic, -- 其余的 topic 名称原样同步到下游同名的 topic
  msg_key,
  msg_value
FROM
  Kafka_source;

  1. 在任务编辑区,单击上方的上线按钮。
  2. 在任务上线设置对话框,选择 Flink 资源池、设置任务优先级和调度策略,然后单击确定。

系统会提示任务上线成功,可以前往任务管理页面查看。

  1. 在任务管理页面,点击启动该任务。

注意事项

上游扩分区处理方法

在上游 topic 扩分区场景下,需要先停止 Flink 同步作业,给下游对应 topic 扩分区,完成后,给 Flink 作业扩并发为当前最大的 topic 分区数后,从最新状态重启 Flink 作业。重启完成后,Flink 作业能一一映射上下游分区。

选择其它分区策略

当前下游写入采用固定分区策略,如果业务上不要求同步时,分区是一一对应的,那么下游写入可以选择使用默认分区策略。默认分区策略对没有消息键的消息使用粘性分区策略(sticky partition strategy)进行分区,对含有消息键的消息使用 murmur2 哈希算法计算分区。
如果上游 topic 分区不要求一一对应,且 topic 分区有倾斜,推荐使用默认分区策略,使得下游 topic 各分区数据是均匀的。默认分区策略设置方法为,在 Kafka Sink DDL 里,去掉 'sink.partitioner' 相关的配置,则默认就会使用该分区策略。

如果当前您正在使用 MirrorMaker 进行 Kafka 的数据流量同步,如果您想要稳定的迁移到火山 Flink,可以参考如下的步骤:

迁移步骤

  1. 获取 MirrorMaker 配置,主要配置包括配置源和目标 Kafka 集群的连接信息、允许同步 topic 的正则表达式、是否刷新 topic 列表等。
  2. 基于 Flink SQL 模版填写配置:基于上文提供的 Flink SQL 模版,把 MirrorMaker 的配置填写到对应的参数里。

建议:建议 MirroMaker 和 Flink SQL 使用同样的消费者组。并且 Flink SQL 中设置 Kafka 的启动模式为 'scan.startup.mode' = 'group-offset'。这样可以保证 Flink SQL 从 MirroMaker 消费的 Offset 继续进行消费。

  1. 启动 Flink 作业,并观察作业的运行监控,以及集群的运行监控。

双跑方案

在 MirrorMaker 迁移火山 Flink 时,可以采用常用的灰度发布策略。灰度发布允许我们将迁移过程分为多个阶段,逐步将流量引导到新系统或新的 Kafka 集群中,以最小化风险和潜在的影响。
具体是先把少量 topic 切换到火山 Flink 进行同步。观察一段时间,确认稳定性、性能满足要求后,逐步增加灰度的 topic 范围。建议的灰度比例为 5% -> 20% -> 50% -> 100%,逐步增加迁移比例。
火山 Flink 支持在给 Source 的 topic list 增加新的 topic,从最新状态恢复时,作业的存量 topic 从历史状态恢复,新增的 topic 从 SQL 配置的参数启动,比如 SQL 里配置了 group-offset 模式,则从该模式启动。
监控指标:

  • Flink 作业稳定性指标(failover 情况、负载情况)、性能指标(QPS、延迟)。
  • 上下游 Kafka 集群指标(集群负载、QPS、吞吐)。

回滚方案

在迁移过程中如果发现问题,需要快速回滚到 MirrorMaker。具体回滚步骤:

  1. 停止 Flink 作业。
  2. 把需要回滚的 topic 添加到 MirrorMaker 白名单,重启 MirrorMaker,从上游集群的消费者组的 消费 offset 开始消费。

割接方案

流量通过灰度发布的方式,逐步把所有 topic 的流量都切换到火山 Flink。

性能说明

根据实际的性能测试,在消息大小为每条 1KB 的情况下,火山 Flink 单 CU 数据处理能力为 2w QPS,单 CU 数据速率为 20MB/s。测试过程中,作业运行稳定,上下游 Kafka 集群运行稳定。