将目标数据库的 JDBC 驱动 JAR 包添加到 Flink 的依赖文件中。参考 使用 JDBC 或者 MySQL-CDC 数据源。
通过 Flink SQL 或 Table API 创建 JDBC Catalog,指定数据库连接信息。
CREATE CATALOG jdbc_catalog WITH ( 'type' = 'jdbc', 'default-database' = 'flink_db', 'username' = 'root', 'password' = '123456', 'base-url' = 'jdbc:mysql://localhost:3306' );
type
:Catalog 类型,固定为 jdbc
。default-database
:默认连接的数据库名。username
/ password
:数据库账号密码。base-url
:JDBC 连接 URL(不包含数据库名)。USE CATALOG jdbc_catalog;
直接在 JDBC Catalog 中创建表,表结构会同步到目标数据库:
CREATE TABLE user_behavior ( user_id INT, item_id INT, action_time TIMESTAMP(3) ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/flink_db', 'table-name' = 'user_behavior' );
当前 JDBC 仅支持批读,不支持流读,SQL 示例如下:
SELECT * FROM user_behavior WHERE user_id > 100;
INSERT INTO user_behavior SELECT user_id, item_id, action_time FROM kafka_source;
通过 WITH
参数配置连接池:
CREATE CATALOG jdbc_catalog WITH ( ... 'connection.max-retry-timeout' = '60s', 'connection.pool.size' = '5' );
使用 Flink JDBC Sink 实现流式写入:
CREATE TABLE jdbc_sink ( user_id INT, cnt BIGINT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/flink_db', 'table-name' = 'user_stats', 'sink.buffer-flush.max-rows' = '1000', -- 批量写入条数 'sink.buffer-flush.interval' = '10s' -- 批量写入间隔 );