本实验以DataLeap on Las为例,完成数据从datagen随机生成到mysql的数据同步。
由于现阶段DataLeap与Las服务以华北2(北京)-可用区A(cn-beijing-a)为主,以下相关的私有网络等产品都指此地域&可用区。
当前现有LAS Flink 支持的Connector见:https://www.volcengine.com/docs/6492/130252
预计部署时间:40分钟
级别:中级
相关产品:大数据开发套件、湖仓一体分析服务LAS
受众: 通用
已购买开通私有网络服务
已购买开通DataLeap产品
已购买开通湖仓一体LAS服务
子账户具备DataLeap相关权限(参考:https://www.volcengine.com/docs/6260/65408)
已购买并开通云数据库MYSQL版本服务
创建mysql实例可参考:https://www.volcengine.com/docs/6313/75366
本实验案例中,白名单主要设置私有网络的IPv4 CIDR
具体的白名单设置方法,可参考:https://www.volcengine.com/docs/6357/96144
在mysql中,通过登录控制台,创建本案例中Flink CDC Sink相关的库表
CREATE DATABASE demo;
CREATE TABLE `student_sink` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` text, `subject` text, `score` int(20) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=48796 DEFAULT CHARSET=utf8
如果没有DataLeap项目,需要先新建项目。新建项目后,进入项目中的“数据开发”,进入开发页面。
在本实验案例中,LAS Flink VPC与MYSQL VPC属不同VPC,因此需要利用Connector跨VPC访问方式,实现Flink跨VPC访问的数据访问。具体可参考:https://www.volcengine.com/docs/6492/146363。
-- 开启跨VPC访问 set las.cross.vpc.access.enabled=true; -- 指定私有 VPC ID set las.cross.vpc.vpc.id=替换自己的VPC ID; -- 指定子网 ID set las.cross.vpc.subnet.id=替换自己的子网ID; -- 指定安全组 ID set las.cross.vpc.security.group.id=替换自己的安全组; CREATE TEMPORARY TABLE student_source ( id INT, name STRING, subject STRING, score INT, primary key (id) NOT enforced ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '50' ); CREATE TEMPORARY TABLE student_sink ( id INT, name STRING, subject STRING, score INT, primary key (id) NOT enforced ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://实例:端口/demo?serverTimezone=UTC&useSSL=false', 'username' = '账户', 'password' = '密码', 'table-name' = 'student_sink' ); INSERT INTO student_sink SELECT * FROM student_source
在本实验中,用到了Flink提供的随机数据生成Connector:datagen,该Connector 选项参数详情如下:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/datagen/
选项 | 要求 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
connector | 必填 | String | 指定要使用的连接器,本例子中指定datagen | |
rows-per-second | 选填 | 10000 | Long | 每秒发送数据的速度 |
number-of-rows | 选填 | Long | 要发送的行总数,默认情况下,是无界的 | |
fields.#.kind | 选填 | random | String | sequence或random |
fields.#.min | 选填 | 随机生成器的最小值,适用于数字类型 | ||
fields.#.max | 选填 | 随机生成器的最大值,适用于数字类型 | ||
fields.#.max-past | 选填 | 0 | Duration | Maximum past of timestamp random generator, only works for timestamp types. |
fields.#.length | 选填 | 100 | Integer | 用于生成char/varchar/string/array/map/multiset类型的集合的大小或长度 |
fields.#.start | 选填 | 序列生成器的起始值 | ||
fields.#.end | 选填 | 序列生成器的最终值 |
填写运行参数
调试并提交
提交记录在LAS的“作业管理”标签中可以查询。
作业提交后,需检查作业的运行状态是否正常,提交日志、执行日志是否有异常日志打印。如发现异常无法处理,请联系客服获取帮助。
登录MYSQ客户端,检查数据写入情况。
关于LAS的作业编写与调试,可参考:https://www.volcengine.com/docs/6260/80007