You need to enable JavaScript to run this app.
导航
JDBC Catalog
最近更新时间:2025.03.28 15:32:06首次发布时间:2025.03.28 15:32:06
我的收藏
有用
有用
无用
无用

1. 使用限制

  • 支持的数据库:需兼容 JDBC 标准的关系型数据库(如 MySQL、PostgreSQL、Oracle 等)。
  • 权限要求:需具备目标数据库的读写权限(如建表、查询、插入等)。
  • 依赖配置:需在 Flink 环境中添加对应数据库的 JDBC 驱动依赖。
  • 功能限制
    • 暂不支持自动创建数据库。
    • 部分数据库特定功能(如存储过程)可能无法通过 Catalog 直接使用。
    • 不支持流读数据表,流式写入需结合 Flink 的 JDBC Sink 使用。

2. 使用步骤

2.1 添加 JDBC 驱动依赖

将目标数据库的 JDBC 驱动 JAR 包添加到 Flink 的依赖文件中。参考 使用 JDBC 或者 MySQL-CDC 数据源

2.2 创建 JDBC Catalog

通过 Flink SQL 或 Table API 创建 JDBC Catalog,指定数据库连接信息。

2.2.1 使用 SQL 创建 Catalog

CREATE CATALOG jdbc_catalog
WITH (
    'type' = 'jdbc',
    'default-database' = 'flink_db',
    'username' = 'root',
    'password' = '123456',
    'base-url' = 'jdbc:mysql://localhost:3306'
);

2.2.2 参数说明

  • type:Catalog 类型,固定为 jdbc
  • default-database:默认连接的数据库名。
  • username / password:数据库账号密码。
  • base-url:JDBC 连接 URL(不包含数据库名)。

2.3 使用 JDBC Catalog 操作数据

2.3.1 切换 Catalog

USE CATALOG jdbc_catalog;

2.3.2 创建表

直接在 JDBC Catalog 中创建表,表结构会同步到目标数据库:

CREATE TABLE user_behavior (
    user_id INT,
    item_id INT,
    action_time TIMESTAMP(3)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/flink_db',
    'table-name' = 'user_behavior'
);

2.3.3 查询数据

当前 JDBC 仅支持批读,不支持流读,SQL 示例如下:

SELECT * FROM user_behavior WHERE user_id > 100;

2.3.4 插入数据

INSERT INTO user_behavior
SELECT user_id, item_id, action_time FROM kafka_source;

3. 高级配置

3.1 连接池优化

通过 WITH 参数配置连接池:

CREATE CATALOG jdbc_catalog
WITH (
    ...
    'connection.max-retry-timeout' = '60s',
    'connection.pool.size' = '5'
);

3.2 流式写入

使用 Flink JDBC Sink 实现流式写入:

CREATE TABLE jdbc_sink (
    user_id INT,
    cnt BIGINT
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/flink_db',
    'table-name' = 'user_stats',
    'sink.buffer-flush.max-rows' = '1000', -- 批量写入条数
    'sink.buffer-flush.interval' = '10s'   -- 批量写入间隔
);