JDBC 连接器提供了对 MySQL、PostgreSQL 等常见的关系型数据库的读写能力,支持做数据源表、结果表和维表。
如果您需要使用 JDBC 连接器连接云数据库 veDB MySQL 版,您的连接终端请按照以下要求配置,否则可能会因为自定义连接终端的限制而出现任务故障。
如需详细了解各参数含义,请参见编辑连接终端。
CREATE TABLE jdbc_source ( name String, score INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db', 'table-name' = '<yourtablename>', 'username' = 'admin', 'password' = 'Passord', 'scan.partition.column' = 'score', 'scan.partition.num' = '2', 'scan.partition.lower-bound' = '0', 'scan.partition.upper-bound' = '100', 'scan.fetch-size' = '1' );
CREATE TABLE jdbc_sink ( name String, score INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db', 'table-name' = '<yourtablename>', 'username' = 'admin', 'password' = 'Pa$$w0rd', 'sink.buffer-flush.max-rows' = '100' );
CREATE TABLE jdbc_lookup ( name String, score INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://192.*.*.*:3306/doc_db', 'table-name' = '<yourtablename>', 'username' = 'admin', 'password' = 'MyPa$$w0rd', 'lookup.cache.max-rows' = '100', 'lookup.max-retries' = '3' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 JDBC 连接器。 |
url | 是 | (none) | String | JDBC 数据库的连接地址。 |
table-name | 是 | (none) | String | 表名。 |
driver | 否 | (none) | String | JDBC Driver 的类名。如果不设置,则自动从 url 参数值中提取。 |
username | 否 | (none) | String | 数据库登录用户名和密码。使用时,两者必须同时使用。 |
password | 否 | (none) | String | 数据库登录用户名和密码。使用时,两者必须同时使用。 |
scan.partition.column | 否 | (none) | String | 指定分区扫描(Partitioned Scan)的列名,该列必须是数值类型、日期类型、时间戳类型等。 |
scan.partition.num | 否 | (none) | Integer | 分区扫描启用后,指定分区数。 |
scan.partition.lower-bound | 否 | (none) | Integer | 分区扫描启用后,指定首个分区的最小值。 |
scan.partition.upper-bound | 否 | (none) | Integer | 分区扫描启用后,指定最后一个分区的最大值。 |
scan.fetch-size | 否 | 0 | Integer | 读取数据时,批量读取的行数。 |
sink.buffer-flush.max-rows | 否 | 100 | Integer | 批量输出时,缓存中最多缓存多少数据。如果设置为 0,表示禁止输出缓存。 |
sink.buffer-flush.interval | 否 | 1s | Duration | 批量输出时,每批次最大的间隔,单位毫秒。 |
sink.max-retries | 否 | 3 | Integer | 数据写入失败时,最大重试次数。 |
lookup.cache.max-rows | 否 | (none) | Integer | 查询缓存(Lookup Cache)中最多缓存的数据条数。 |
lookup.cache.ttl | 否 | (none) | Duration | 查询缓存中每条记录最长的缓存时间。 |
lookup.max-retries | 否 | 3 | Integer | 数据库查询失败时,最多重试的次数。 |
lookup.cache | 是 | NONE | String | 维表的 cache 策略,可选:
|
lookup.full-cache.reload-strategy | 否 | PERIODIC | String | 定义使用哪种策略来重新加载 Full Cache 缓存:
|
lookup.full-cache.periodic-reload.interval | 否 | (none) | Duration | 采用 PERIODIC 加载策略下的 Full Cache 重新加载间隔。 |
lookup.full-cache.periodic-reload.schedule-mode | 否 | FIXED_DELAY | String | 定义在 PERIODIC 加载策略下的调度模式: |
lookup.full-cache.timed-reload.iso-time | 否 | (none) | String | 在 TIMED 加载策略下的 Full Cache 加载时间,时间采用 ISO-8601 格式。 |
lookup.full-cache.timed-reload.interval-in-days | 否 | 1 | Integer | 在 TIMED 加载策略下的 Full Cache 加载间隔,间隔是一天的倍数,设置为 1 则表示间隔 1 天加载一次,设置为 2 则表示间隔 2 天加载一次。 |
结果表。
CREATE TABLE datagen_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5' ); CREATE TABLE jdbc_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db', 'username' = 'admin', 'password' = '***', 'table-name' = 'orders' ); insert into jdbc_sink select * from datagen_source;
源表。
CREATE TABLE jdbc_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://mysq**3fb.rds.ivolces.com:3306/doc_db', 'username' = 'admin', 'password' = 'Passord', 'table-name' = 'source_table' ); CREATE TABLE print_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'print' ); insert into print_sink select * from jdbc_source;