You need to enable JavaScript to run this app.
导航
配置 PostgreSQL 数据源
最近更新时间:2024.12.05 15:47:25首次发布时间:2023.03.17 16:07:31

PostgreSQL 是一种常用的关系型数据库,数据集成同步任务为您提供读取和写入 PostgreSQL 数据源的双向通道能力。
本文为您介绍 DataSail 的 PostgreSQL 数据源配置、同步任务可视化和脚本模式(DSL)配置能力,实现与不同数据源的数据互通能力。

1 支持的 PostgreSQL 版本

  • 离线读写
    目前支持读写的 PostgreSQL 可选版本为 PostgreSQL 10、11、12、13、14。
    您可以在数据库中执行以下语句,查看 PostgreSQL 数据库的版本:
    show server_version
    

2 使用前提

  1. 子账号新建数据源时,需要有项目的管理员角色,方可以进行新建数据源操作。各角色对应权限说明,详见:管理成员

  2. 确保集成同步任务使用的独享数据集成资源组,具有 PostgreSQL 数据库节点的网络访问能力。网络互通方案详见网络连通解决方案

    • 数据源为 RDS 云数据库实例时,需要将集成资源组所在 VPC 中的 IPv4 CIDR 地址添加到 PostgreSQL 访问白名单中:
      1. 确认集成资源组所在的 VPC:
        Image
      2. 查看 VPC 的 IPv4 CIDR 地址:

        注意

        若考虑安全因素,减少 IP CIDR 的访问范围,您至少需要将集成资源组绑定的子网下的 IPv4 CIDR 地址加入到数据库白名单中。

        Image
      3. 将获取到的 IPv4 CIDR 地址添加进 PostgreSQL 数据库白名单中,添加操作详见创建白名单
        Image
    • 数据源为公网自建数据源,需通过公网形式访问:
      1. 集成资源组开通公网访问能力,操作详见开通公网
      2. 并将公网 IP 地址,添加进 PostgreSQL 数据库白名单中。
  3. 数据源配置时填写的用户名信息,需确保有 replication 角色权限,您可通过以下方式确认:

    select userepl from pg_user where usename='xxx';
    
    -- 预期返回结果为 True,返回 False 则表示无权限,您可以通过如下语句进行授权:
    ALTER USER <user> REPLICATION;
    
  4. 为了支持 PostgreSQL 实时整库解决方案在全量同步时读取从库,增量同步时读取主库(Binlog 必须从主库读取),需要在 PostgreSQL 数据源配置时,新增高级参数 slave_host,其对应参数值为从库域名信息。

    说明

    该数据源建议只在解决方案场景中使用。

    Image

  5. PostgreSQL 数据源配置实时整库时,需将数据库集群中 wal_level 设置为 logical,支持逻辑复制机制,参数设置完成后,需重启集群生效。以下为火山引擎 PostgreSQL 实例配置截图。
    Image

  6. 检查是否可以启动wal_sender进程

    -- 查询 max_wal_senders
    show max_wal_senders;
    
    -- 查询 pg_stat_replication 数量
    select count(*) from pg_stat_replication
    

    max_wal_senders返回值不为空,且其值大于pg_stat_replication值时,意味着存在有剩余且可用的wal_sender进程。此时,PostgreSQL 数据库将会为同步数据程序启动wal_sender进程来给订阅者发送日志。

  7. PostgreSQL 数据库中待同步的数据若有超过 8KB 的字段,您需执行以下命令:

    ALTER TABLE table_name REPLICA IDENTITY FULL;
    

3 支持的字段类型

当前主要字段支持情况如下

字段类型

离线读(PostgreSQL Reader)

离线写(PostgreSQL Writer)

char

支持

支持

bpchar

支持

支持

varchar

支持

支持

text

支持

支持

character varying

支持

支持

character

支持

支持

smallint

支持

支持

int2

支持

支持

integer

支持

支持

int

支持

支持

int4

支持

支持

bigint

支持

支持

int8

支持

支持

smallserial

支持

支持

serial

支持

支持

bigserial

支持

支持

double

支持

支持

float8

支持

支持

money

支持

支持

double precision

支持

支持

numeric

支持

支持

decimal

支持

支持

real

支持

支持

float4

支持

支持

boolean

支持

支持

bool

支持

支持

date

支持

支持

time

支持

支持

timetz

支持

支持

timestamp

支持

支持

timestamptz

支持

支持

bytea

支持

支持

bit

支持

支持

bit varying

支持

支持

varbit

支持

支持

uuid

支持

支持

cidr

支持

支持

xml

支持

支持

inet

支持

支持

macaddr

支持

支持

enum

支持

支持

json

支持

支持

jsonb

支持

支持

aclitem

支持

支持

_aclitem

支持

支持

_int2

支持

支持

_int4

支持

支持

_float4

支持

支持

_text

支持

支持

_char

支持

支持

cid

支持

支持

inet

支持

支持

int2vector

支持

支持

interval

支持

支持

oid

支持

支持

_oid

支持

支持

pg_node_tree

支持

支持

box

支持

支持

line

支持

支持

lseg

支持

支持

tsquery

支持

支持

tsvector

支持

支持

polygon

支持

支持

circle

支持

支持

point

支持

支持

path

支持

支持

geography

支持

支持

gemotry

支持

不支持

4 数据同步任务开发

4.1 数据源注册

新建数据源操作详见配置数据源,下面为您介绍用连接串方式配置 PostgreSQL 数据源信息:

注意

PostgreSQL 侧如果是白名单访问机制,则不同网络环境的连接串地址,需要添加不同的 IP 地址到数据库白名单中,确保集成资源组使用的 VPC 与 PostgreSQL 网络能互通:

  • 如果使用的是公网连接串访问,则需要给集成资源组添加公网 IP,并将公网 IP 地址加入到白名单中。
  • 如果使用的是私网连接串访问,则需要将资源组 VPC 下的 IPv4 CIDR 地址加入到白名单中。

详见网络连通解决方案

参数

说明

基本配置

数据源类型

PostgreSQL

接入方式

连接串

数据源名称

数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。

参数配置

Database

输入已创建成功的 PostgreSQL 数据库名称。

SSL 模式

SSL 模式可以提供窃听攻击、中间人攻击(Man-in-the-middle MITM)、假冒攻击的保护措施。不同的 SSL 模式用于提供不同等级的保护,支持以下四种模式配置:

  • Disable:不使用 SSL 模式,适用于私网访问,不会有加解密的性能损耗。
  • allow:允许使用 SSL 模式,只有 Server 端一定需要使用 SSL 通信时,才会使用 SSL 通信,否则不使用 SSL 通信。
  • perfer:倾向于使用 SSL 模式,只要 Server 端支持 SSL ,client 端与 Server 端就会使用 SSL 通信。
  • require:需要使用 SSL 模式,适用于公网访问,会有加解密的性能的损耗。

具体说明详见 PostgreSQL SSL 官方文档。

Host

输入连接数据库时,使用的主机名或 IP 地址。

Port

PostgreSQL 数据库连接的端口号。

user

有权限访问数据库的用户名信息。

Password

输入用户名对应的密码信息。

高级参数

数据源配置支持添加 PostgreSQL 相关的高级参数,如配置 PostgreSQL 从库相关的 Host 信息,以参数名和参数值的形式填入。

4.2 新建任务

PostgreSQL 数据源测试连通性成功后,进入到数据开发界面,开始新建 PostgreSQL 相关通道任务。
新建任务方式详见离线数据同步流式数据同步

4.3 可视化配置说明

任务创建成功后,您可根据实际场景,配置PostgreSQL 批式读、PostgreSQL 批式写或 PostgreSQL 流式写等通道任务。

4.3.1 PostgreSQL 批式读

Image
数据来源选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*数据源类型

下拉选择 PostgreSQL 数据源类型。

*数据源名称

已在数据源管理中注册成功的 PostgreSQL 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 PostgreSQL 数据源。

*Schema 目录

数据库下已有的 Schema 目录信息,下拉可选。

*数据表

选择需要采集的数据表名称信息,目前单个任务只支持将单表的数据采集到一个目标表中。

数据过滤

支持您将需要同步的数据进行筛选条件设置,只同步符合过滤条件的数据,可直接填写关键词 where 后的过滤 SQL 语句,例如:create_time > '${date}',表示只同步 create_time 大于等于 ${date} 的数据,不需要填写 where 关键字。
语句填写完成后,您可单击右侧的校验按钮,进行过滤语句校验。

说明

该过滤语句通常用作增量同步,暂时不支持 limit 关键字过滤,其 SQL 语法需要和选择的数据源类型对应。
如果不配置,默认会同步全量数据。

切分建

根据配置的字段进行数据分片,建议使用主键或有索引的列作为切分键:

  • 如果表没有主键或者索引列,可以不配置该字段,同步任务不会进行分片,并以单并发的方式同步所有的数据;
  • 建议使用主键或有索引的列作为切分键,切分键配置没有索引的列同步任务会比较慢;

说明

目前仅支持类型为整型或字符串的字段作为切分建。

4.3.2 PostgreSQL 批式写

Image
数据来源选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*目标类型

数据去向目标类型选择 PostgreSQL。

*数据源名称

已在数据源管理界面注册的 PostgreSQL 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 PostgreSQL 数据源。

*Schema 目录

数据库下已有的 Schema 目录信息,下拉可选。

*数据表

数据源下所属需数据写入的表名,下拉可选。

写入前准备语句

在执行该数据集成任务前,需要率先执行的 SQL 语句,通常是为了使任务重跑时支持幂等。
例如您可以通过填写语句,清空表中的某些旧数据,清空完成后,再执行集成任务写入新的数据。如删除 date='${date}' 的数据:delete from table_name where date='${date}'
语句填写完成后,您可单击右侧的校验按钮,进行语句校验是否符合逻辑。

说明

可视化通道任务配置中只允许执行一条写入前准备语句。

写入后准备语句

执行数据同步任务之后执行的 SQL 语句。例如写入完成后插入某条特殊的数据,标志导入任务执行结束。
语句填写完成后,您可单击右侧的校验按钮,进行语句校验是否符合逻辑。

说明

可视化通道任务配置中只允许执行一条写入后准备语句。

*数据写入方式

下拉选择数据写入 PostgreSQL 的方式:

  • insert into: 当主键/唯一性索引冲突时会无法写入冲突的行,任务会运行失败。

说明

如果希望主键/唯一索引冲突时任务正常执行可以添加高级参数: job.writer.is_insert_ignoretrue

4.3.3 PostgreSQL 流式写

支持可视化方式配置流式写入 PostgreSQL 单表。PostgreSQL Writer 通过 JDBC 远程连接 PostgreSQL 数据库,并执行相应的 SQL 语句,将数据写入 PostgreSQL。流式写入 PostgreSQL 配置方式如下:
Image
数据目标端选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。

参数

说明

*目标类型

数据去向目标类型选择 PostgreSQL。

*数据源名称

已在数据源管理界面注册的 PostgreSQL 数据源,下拉可选。
若还未建立相应数据源,可单击数据源管理按钮,前往创建 PostgreSQL 数据源。

*Schema 目录

数据库下已有的 Schema 目录信息,下拉可选。

*数据表

数据源下所属需数据写入的表名,下拉可选。

4.3.4 字段映射

数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。PostgreSQL 同时也支持您在源端配置字段自定义数据类型,来满足您更复杂的数据结构或业务特定场景。
字段映射支持选择基础模式转换模式配置映射:

注意

基础模式和转换模式不支持互相切换,模式切换后,将清空现有字段映射中所有配置信息,一旦切换无法撤销,需谨慎操作。

  • 转换模式:
    字段映射支持数据转换,您可根据实际业务需求进行配置,将源端采集的数据,事先通过数据转换后,以指定格式输入到目标端数据库中。
    转换模式详细操作说明详见4.1 转换模式
    在转换模式中,你可依次配置:来源节点、数据转换、目标节点信息:

    配置节点

    说明

    来源节点

    配置数据来源 Source 节点信息:

    • 节点名称:自定义输入来源节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
    • 数据字段:通过自动添加、手动添加等方式添加数据来源字段信息。

    配置完成后,单击确认按钮,完成来源节点配置。

    数据转换

    单击数据转换右侧添加按钮,选择 SQL 转换方式,配置转换信息和规则:

    • 节点名称:自定义输入来源节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
    • SQL 脚本:输入 SQL 脚本转换规则,目前仅支持添加一个转换的 SQL 语句,且不能包括 “;”。

    配置完成后,单击确认按钮,完成数据转换节点配置。SQL 脚本示例详见4.1.2 添加转换节点

    目标节点

    配置目标节点 Sink 信息:

    • 节点名称:自定义输入来源节点名称信息,只允许由数字、字母、下划线、-和.组成;且长度不能超过10。
    • 数据字段:通过自动添加、手动添加等方式添加数据目标字段信息。

    配置完成后,单击确认按钮,完成目标节点配置。

  • 基础模式:

    您可通过以下三种方式操作字段映射关系:

    • 自动添加:单击自动添加按钮,根据两端数据表信息,可以自动填充来源和目标的字段信息。
    • 手动添加:单击手动添加按钮,可以手动编辑来源和目标的字段信息,可以逐个添加。

      说明

      来源端字段信息支持输入数据库函数和常量配置,单击手动添加按钮,在源表字段中输入需添加的值,并选择函数或常量类型,例如:

      • 函数:支持您输入 now()、current_timestamp() 等 PostgreSQL 数据库支持的函数。
      • 常量:您可自定义输入常量值,'123'、'${DATE}'、'${hour}' 等,输入值两侧需要加上英文单引号,支持结合时间变量参数使用。
    • 移动\删除字段:您也可以根据需要移动字段映射顺序或删除字段。

4.4 DSL 配置说明

PostgreSQL 数据源支持使用脚本模式(DSL)的方式进行配置。
在某些复杂场景下,或当数据源类型暂不支持可视化配置时,您可通过任务脚本的方式,按照统一的 Json 格式,编写 PostgreSQL Reader 和 PostgreSQL Writer 参数脚本代码,来运行数据集成任务。

4.4.1 进入 DSL 模式

进入 DSL 模式操作流程,可详见 MySQL 数据源-4.4.1 进入DSL 模式

4.4.2 PostgreSQL 批式读

进入 DSL 模式编辑界面后,您可根据实际情况替换相应参数,PostgreSQL 批式读脚本示例如下:

// 变量使用规则如下:
// 1.自定义参数变量: {{}}, 比如{{number}}
// 2.系统时间变量${}, 比如 ${date}、${hour}
// **************************************
{
    // [required] dsl version, suggest to use latest version
    "version": "0.2",
    // [required] execution mode, supoort streaming / batch now
    "type": "batch",
    // reader config
    "reader": {
        // [required] datasource type
        "type": "pg",
        // [optional] datasource id, set it if you have registered datasource
        "datasource_id": 12345,
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
            "columns": [
                {
                    "name": "name_sample",
                    "type": "type_sample"
                }
            ],
            "filter": "id > 10",
            "split_pk": "split_pk_sample",
            "table_schema":"table_schema",
            "table_name": "table_name_sample"
        }
    },
    // writer config
    "writer": {
    },
    // common config
    "common": {
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
        }
    }
}

Reader 参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:

参数名

描述

默认值

*type

数据源类型,对于 PostgreSQL 类型,填写:pg

*datasource_id

注册的 PostgreSQL 数据源 ID。可以在项目控制台 > 数据源管理界面中查找。

*table_schema

填写 PostgreSQL 数据库中的 Schema 名称。

*table_name

需要同步的数据表名称,目前单个任务只支持将单表的数据采集到一个目标表中。

filter

同步数据的筛选条件,同步数据时只会同步符合过滤条件的数据,直接填写关键词 where 后的过滤 SQL 语句。

  • 如将过滤条件指定为:date>=${date} ,表示只同步 date 大于等于 ${date}。
  • 过滤条件可以有效地进行业务增量同步。如果不配置,默认会同步全量数据。

split_pk

根据配置的字段进行数据分片,建议使用主键或有索引的列作为切分键,同步任务会启动并发任务进行数据同步,提高同步速率:

  • 如果表没有主键或者索引列,可以不配置该字段,同步任务不会进行分片,并以单并发的方式同步所有的数据;
  • 建议使用主键或有索引的列作为切分键,切分键配置没有索引的列同步任务会比较慢;

说明

目前仅支持类型为整型或字符串的字段作为切分建。

*columns

所配置的表中,需要同步的列名集合,使用 JSON 的数组描述字段信息。

  • 支持列裁剪:列可以挑选部分列进行导出。
  • 支持列换序:列可以不按照表 Schema 信息顺序进行导出。
  • column 必须显示指定同步的列集合,不允许为空。
  • 支持函数、常量形式添加列:
    • 函数:PostgreSQL Reader 支持您输入 now()、current_timestamp() 等 PostgreSQL 数据库支持的函数。
    • 常量:PostgreSQL Reader 支持您自定义输入常量值,如 '123'、'${DATE}'、'${hour}' 等,输入值两侧需要加上英文单引号,支持结合时间变量参数使用。

4.4.3 PostgreSQL 批式写

根据实际情况替换 PostgreSQL 批式写相应参数,PostgreSQL 批式写脚本示例如下:

// **************************************
// 变量使用规则如下:
// 1.自定义参数变量: {{}}, 比如{{number}}
// 2.系统时间变量${}, 比如 ${date}、${hour}
// **************************************
{
    // [required] dsl version, suggest to use latest version
    "version": "0.2",
    // [required] execution mode, supoort streaming / batch now
    "type": "batch",
    // reader config
    "reader": {
        // [required] datasource type
        "type": "xx",
        // [optional] datasource id, set it if you have registered datasource
        "datasource_id": null,
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
          
        }
    },
    // writer config
    "writer": {
        // [required] datasource type
        "type": "pg",
        // [optional] datasource id, set it if you have registered datasource
        "datasource_id": 12345,
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
            "table_schema":"table_schema",
            "table_name":"table_1",
            "pre_sql_list":[""],
            "post_sql_list":[""],
            "write_mode":"directlyInsert",
            "columns": [
                {
                    "name": "name_sample",
                    "type": "type_sample"
                }
            ]
        }
    },
    // common config
    "common": {
        // [required] user parameter
        "parameter": {
            // ********** please write here **********
            // "key" : value
        }
    }
}

Writer 参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:

参数名

描述

默认值

*type

数据源类型,对于 PostgreSQL 类型,填写:pg

*datasource_id

注册的 PostgreSQL 数据源 ID。可以在项目控制台 > 数据源管理界面中查找。

*table_schema

填写 PostgreSQL 数据库中的 Schema 名称。

*table_name

填写需要同步的数据表名称,一个数据集成任务只能同步数据到一张目标表。

pre_sql_list

写入前准备语句:在执行数据集成任务前,率先执行的 SQL 语句。此语句通常是为了使任务重跑时支持幂等。
例如执行前清空表中的某些旧数据,清空完成后,在执行集成任务写入新的数据,例如删除 date='${date}' 的数据:["delete from table_name where date='${date}'", "xxx"]

说明

DSL 模式支持配置多条写入前准备语句,多条语句之间用英文逗号分隔。

post_sql_list

写入后准备语句:执行数据同步任务后执行的 SQL 语句。例如数据写入完成后,插入某条特殊的数据,标志导入任务执行结束。
示例:["insert into table_name (col1,col2..) values(values1,values2)", "xxx"]

说明

DSL 模式支持配置多条写入后准备语句,多条语句之间用英文逗号分隔。

*write_mode

数据导入模式,支持 insert into 模式:
insert into:当主键/唯一性索引冲突时会写不进去冲突的行,任务会运行失败。

  • 使用该模式时,请将 write_mode 设置为 directlyInsert
  • 如果希望主键/唯一索引冲突时任务正常执行,可以在 writer.parameter 参数下添加高级参数 job.writer.is_insert_ignore:true

*columns

所配置的表中需要同步的列名集合,使用 JSON 的数组描述字段信息。

  • 支持列裁剪:列可以挑选部分列进行导出。
  • 支持列换序:列可以不按照表 Schema 信息顺序进行导出。

注意

  • column 必须显示指定同步的列集合,不允许为空。
  • column 必须与导入的源端列集合对齐,不允许多列或少列。

4.5 高级参数说明

  • 对于可视化通道任务,读参数需要加上 job.reader. 前缀,写参数需要加上 job.writer. 前缀,如下图所示:
    Image
  • 对于 DSL 任务,读参数请配置到 reader.parameter 下,写参数请配置到 writer.parameter 下,直接输入参数名称和参数值。如下图所示:
    Image

4.5.1 PostgreSQL 批式读

参数名

描述

默认值

init_sql

读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境

reader_fetch_size

每次拉取的数据条数,只在准确分片中有效。

10000

shard_split_mode

分片模式,支持准确分片、并发分片、不分片三种模式:

  • 准确分片(默认):根据配置的分片键将数据拆分为不同的区间,除下最后一个区间外,每个区间精准的有 reader_fetch_size 条数。
    • 拉取数据量很大的表或者分片键不是主键或者索引键时,该分片模式分片时间会比较长;
    • 该分片模式支持分片键为整型数据类型和字符串数据类型;
    • 配置方式:将该参数配置为 accurate
  • 并发分片:根据表的最大最小值,将所有的数据按照并发数进行区间分片。
    • 该分片模式仅支持分片键为整型数据类型;
    • 配置方式:将该参数配置为 parallelism
  • 不分片:不进行分片,适用于没有主键、索引键的表。
    • 配置方式:将该参数配置为 nosplit 或者不配置 split_pk

准确分片

customized_sql

自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。
例如:需要进行多表 join 后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id

说明

配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。

4.5.2 PostgreSQL 批式写

参数名

描述

默认值

is_insert_ignore

insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突

false

write_batch_interval

一次性批量提交的数据条数,该值可以减少与 PostgreSQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。

100

write_retry_times

PostgreSQL 写入失败时重试次数。

3

retry_interval_seconds

写入失败后两次重试的时间间隔,单位秒

write_batch_interval / 10