流式计算 Flink 基于 Flink CDC,通过开发 YAML 作业的方式有效地实现了将数据从源端同步到目标端的数据摄入工作。本文将为您介绍 Flink CDC 作业开发的操作步骤。
项目管理员(Project_Admin)已经在项目内创建好 Flink 资源池,请参见创建资源池。
本文将 Flink CDC 任务开发流程总体分为了两个阶段,分别是开发阶段和运维阶段,每个阶段对应的成员及主要工作均不相同。
配置 | 说明 |
---|---|
作业名称 | 自定义设置作业的名称。 |
作业类型 | 选择 作业类型 > Flink CDC。 |
模板选择 | 支持选择数据源(Source)和数据下游(Sink)的类型。 |
数据路由 | CDC 模板中增加路由模块。路由(Route)指定了匹配一系列源表并映射到目标表(汇表)的规则。最典型的应用场景是子数据库和子表的合并,即将多个上游源表路由到同一个目标表(汇表)。 |
数据转换 | CDC 模板中增加数据转换模块。转换模块可帮助用户根据表中的数据列来删除和扩展数据列。此外,它还能帮助用户在同步过程中过滤一些不必要的数据。 |
存储位置 | 系统提供文件夹管理功能,用于分类管理任务。在体验任务开发过程中,您可以直接选择系统默认存在的数据开发文件夹。 |
引擎版本 | 目前支持 Flink 1.16-volcano、Flink 1.17-volcano 版本,请按需选择。 |
任务描述 | 输入任务的描述语句,一般描述任务实现的功能。 |
在任务编辑区编辑 CDC 任务的业务逻辑代码。
此处提供一个示例 CDC YAML 任务代码。代码含义为:新建一个 MySQL 类型的数据源,将 app_db 库下所有数据表同步到 doris 数据下游。并且对数据表做一些路由和字段转换工作。
source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* sink: type: doris fenodes: 127.0.0.1:8030 username: root password: "" transform: - source-table: adb.web_order01 projection: \*, UPPER(product_name) as product_name filter: id > 10 AND order_id > 100 description: project fields and filter - source-table: adb.web_order02 projection: \*, UPPER(product_name) as product_name filter: id > 20 AND order_id > 200 description: project fields and filter route: - source-table: app_db.orders sink-table: ods_db.ods_orders - source-table: app_db.shipments sink-table: ods_db.ods_shipments - source-table: app_db.products sink-table: ods_db.ods_products pipeline: name: Sync MySQL Database to Doris parallelism: 2
参考 使用 JDBC 或者 MySQL-CDC 数据源上传 mysql-connector-java 到依赖文件中
success
。开发与生产环境隔离,当任务开发人员完成任务开发和调试后,可以将任务上线到生产环境。
在作业开发栏目下查找并单击目标任务,然后在编辑区上方选择正确的执行方式和引擎版本,再单击上线。
在任务上线设置对话框,选择运行资源池、设置任务优先级和调度策略,然后单击确定。
系统会提示任务上线成功,可以前往任务管理页面查看。
配置 | 说明 |
---|---|
运行资源池 | 从下拉列表中选择任务运行的 Flink 资源池。 注意 如果您提交的任务开启了自动调优,则必须运行在按量付费类型的资源池。 |
任务优先级 | 系统默认预置的优先级为 L3,您可以按需设置任务优先级,数字越小优先级越高。 |
调度策略 | 根据需求配置任务调度策略:
|
调度时长 | 设置为 GANG 调度策略时,需要设置调度时长。调度时长表示再次调度的时间间隔,即任务拉起不成功会再次重试调度。 |
更多配置 | CDC 任务支持在上线前跳过深度检查,允许任务强制上线。 |
开发与生产环境隔离,任务开发人员将任务上线到生产环境后,由运维人员启动任务。
在项目左侧导航栏选择任务运维 > 任务管理。
在任务列表页面,单击操作列中的启动。
在启动任务对话框,选择任务启动方式,然后单击确定。
任务启动需要一定时长,请耐心等待。启动成功后,状态为运行中。
配置 | 说明 |
---|---|
启动方式 | 请根据实际情况选择任务启动方式:
说明 首次上线的任务,只能是全新启动方式。 |
参数配置 | 任务携带在开发侧的并行度、Task Manager 和 Job Manager 的资源配置。在启动任务时支持您更新配置并快速生效。 说明 更新参数配置并启动任务后,将新增一个任务版本,并将最新配置同步到任务开发侧。
|
更多设置 | 在任务开发变更时新增或修改算子,可能会导致任务无法从快照恢复,此时您可以选择启用允许忽略部分算子状态功能,保证任务能正常运行。 注意
|
任务在生产环境上正常运行后,您可以在 Flink UI 界面了解任务运行、TaskManager、JobManager 的详细信息。