PostgreSQL 是一种常用的关系型数据库,数据集成同步任务为您提供读取和写入 PostgreSQL 数据源的双向通道能力。
本文为您介绍 DataSail 的 PostgreSQL 数据源配置、同步任务可视化和脚本模式(DSL)配置能力,实现与不同数据源的数据互通能力。
show server_version
子账号新建数据源时,需要有项目的管理员角色,方可以进行新建数据源操作。各角色对应权限说明,详见:管理成员
确保集成同步任务使用的独享数据集成资源组,具有 PostgreSQL 数据库节点的网络访问能力。网络互通方案详见网络连通解决方案。
注意
若考虑安全因素,减少 IP CIDR 的访问范围,您至少需要将集成资源组绑定的子网下的 IPv4 CIDR 地址加入到数据库白名单中。
数据源配置时填写的用户名信息,需确保有 replication 角色权限,您可通过以下方式确认:
select userepl from pg_user where usename='xxx'; -- 预期返回结果为 True,返回 False 则表示无权限,您可以通过如下语句进行授权: ALTER USER <user> REPLICATION;
为了支持 PostgreSQL 实时整库解决方案在全量同步时读取从库,增量同步时读取主库(Binlog 必须从主库读取),需要在 PostgreSQL 数据源配置时,新增高级参数 slave_host,其对应参数值为从库域名信息。
说明
该数据源建议只在解决方案场景中使用。
PostgreSQL 数据源配置实时整库时,需将数据库集群中 wal_level 设置为 logical,支持逻辑复制机制,参数设置完成后,需重启集群生效。以下为火山引擎 PostgreSQL 实例配置截图。
检查是否可以启动wal_sender
进程
-- 查询 max_wal_senders show max_wal_senders; -- 查询 pg_stat_replication 数量 select count(*) from pg_stat_replication
当max_wal_senders
返回值不为空,且其值大于pg_stat_replication
值时,意味着存在有剩余且可用的wal_sender
进程。此时,PostgreSQL 数据库将会为同步数据程序启动wal_sender
进程来给订阅者发送日志。
PostgreSQL 数据库中待同步的数据若有超过 8KB 的字段,您需执行以下命令:
ALTER TABLE table_name REPLICA IDENTITY FULL;
当前主要字段支持情况如下
字段类型 | 离线读(PostgreSQL Reader) | 离线写(PostgreSQL Writer) |
---|---|---|
char | 支持 | 支持 |
bpchar | 支持 | 支持 |
varchar | 支持 | 支持 |
text | 支持 | 支持 |
character varying | 支持 | 支持 |
character | 支持 | 支持 |
smallint | 支持 | 支持 |
int2 | 支持 | 支持 |
integer | 支持 | 支持 |
int | 支持 | 支持 |
int4 | 支持 | 支持 |
bigint | 支持 | 支持 |
int8 | 支持 | 支持 |
smallserial | 支持 | 支持 |
serial | 支持 | 支持 |
bigserial | 支持 | 支持 |
double | 支持 | 支持 |
float8 | 支持 | 支持 |
money | 支持 | 支持 |
double precision | 支持 | 支持 |
numeric | 支持 | 支持 |
decimal | 支持 | 支持 |
real | 支持 | 支持 |
float4 | 支持 | 支持 |
boolean | 支持 | 支持 |
bool | 支持 | 支持 |
date | 支持 | 支持 |
time | 支持 | 支持 |
timetz | 支持 | 支持 |
timestamp | 支持 | 支持 |
timestamptz | 支持 | 支持 |
bytea | 支持 | 支持 |
bit | 支持 | 支持 |
bit varying | 支持 | 支持 |
varbit | 支持 | 支持 |
uuid | 支持 | 支持 |
cidr | 支持 | 支持 |
xml | 支持 | 支持 |
inet | 支持 | 支持 |
macaddr | 支持 | 支持 |
enum | 支持 | 支持 |
json | 支持 | 支持 |
jsonb | 支持 | 支持 |
aclitem | 支持 | 支持 |
_aclitem | 支持 | 支持 |
_int2 | 支持 | 支持 |
_int4 | 支持 | 支持 |
_float4 | 支持 | 支持 |
_text | 支持 | 支持 |
_char | 支持 | 支持 |
cid | 支持 | 支持 |
inet | 支持 | 支持 |
int2vector | 支持 | 支持 |
interval | 支持 | 支持 |
oid | 支持 | 支持 |
_oid | 支持 | 支持 |
pg_node_tree | 支持 | 支持 |
box | 支持 | 支持 |
line | 支持 | 支持 |
lseg | 支持 | 支持 |
tsquery | 支持 | 支持 |
tsvector | 支持 | 支持 |
polygon | 支持 | 支持 |
circle | 支持 | 支持 |
point | 支持 | 支持 |
path | 支持 | 支持 |
geography | 支持 | 支持 |
gemotry | 支持 | 不支持 |
新建数据源操作详见配置数据源,下面为您介绍用连接串方式配置 PostgreSQL 数据源信息:
注意
PostgreSQL 侧如果是白名单访问机制,则不同网络环境的连接串地址,需要添加不同的 IP 地址到数据库白名单中,确保集成资源组使用的 VPC 与 PostgreSQL 网络能互通:
详见网络连通解决方案。
参数 | 说明 |
---|---|
基本配置 | |
数据源类型 | PostgreSQL |
接入方式 | 连接串 |
数据源名称 | 数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。 |
参数配置 | |
Database | 输入已创建成功的 PostgreSQL 数据库名称。 |
SSL 模式 | SSL 模式可以提供窃听攻击、中间人攻击(Man-in-the-middle MITM)、假冒攻击的保护措施。不同的 SSL 模式用于提供不同等级的保护,支持以下四种模式配置:
具体说明详见 PostgreSQL SSL 官方文档。 |
Host | 输入连接数据库时,使用的主机名或 IP 地址。 |
Port | PostgreSQL 数据库连接的端口号。 |
user | 有权限访问数据库的用户名信息。 |
Password | 输入用户名对应的密码信息。 |
高级参数 | 数据源配置支持添加 PostgreSQL 相关的高级参数,如配置 PostgreSQL 从库相关的 Host 信息,以参数名和参数值的形式填入。 |
PostgreSQL 数据源测试连通性成功后,进入到数据开发界面,开始新建 PostgreSQL 相关通道任务。
新建任务方式详见离线数据同步、流式数据同步。
任务创建成功后,您可根据实际场景,配置PostgreSQL 批式读、PostgreSQL 批式写或 PostgreSQL 流式写等通道任务。
数据来源选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*数据源类型 | 下拉选择 PostgreSQL 数据源类型。 |
*数据源名称 | 已在数据源管理中注册成功的 PostgreSQL 数据源,下拉可选。 |
*Schema 目录 | 数据库下已有的 Schema 目录信息,下拉可选。 |
*数据表 | 选择需要采集的数据表名称信息,目前单个任务只支持将单表的数据采集到一个目标表中。 |
数据过滤 | 支持您将需要同步的数据进行筛选条件设置,只同步符合过滤条件的数据,可直接填写关键词 where 后的过滤 SQL 语句,例如:create_time > '${date}',表示只同步 create_time 大于等于 ${date} 的数据,不需要填写 where 关键字。 说明 该过滤语句通常用作增量同步,暂时不支持 limit 关键字过滤,其 SQL 语法需要和选择的数据源类型对应。 |
切分建 | 根据配置的字段进行数据分片,建议使用主键或有索引的列作为切分键:
说明 目前仅支持类型为整型或字符串的字段作为切分建。 |
数据来源选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*目标类型 | 数据去向目标类型选择 PostgreSQL。 |
*数据源名称 | 已在数据源管理界面注册的 PostgreSQL 数据源,下拉可选。 |
*Schema 目录 | 数据库下已有的 Schema 目录信息,下拉可选。 |
*数据表 | 数据源下所属需数据写入的表名,下拉可选。 |
写入前准备语句 | 在执行该数据集成任务前,需要率先执行的 SQL 语句,通常是为了使任务重跑时支持幂等。 说明 可视化通道任务配置中只允许执行一条写入前准备语句。 |
写入后准备语句 | 执行数据同步任务之后执行的 SQL 语句。例如写入完成后插入某条特殊的数据,标志导入任务执行结束。 说明 可视化通道任务配置中只允许执行一条写入后准备语句。 |
*数据写入方式 | 下拉选择数据写入 PostgreSQL 的方式:
说明 如果希望主键/唯一索引冲突时任务正常执行可以添加高级参数: |
支持可视化方式配置流式写入 PostgreSQL 单表。PostgreSQL Writer 通过 JDBC 远程连接 PostgreSQL 数据库,并执行相应的 SQL 语句,将数据写入 PostgreSQL。流式写入 PostgreSQL 配置方式如下:
数据目标端选择 PostgreSQL,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*目标类型 | 数据去向目标类型选择 PostgreSQL。 |
*数据源名称 | 已在数据源管理界面注册的 PostgreSQL 数据源,下拉可选。 |
*Schema 目录 | 数据库下已有的 Schema 目录信息,下拉可选。 |
*数据表 | 数据源下所属需数据写入的表名,下拉可选。 |
数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。PostgreSQL 同时也支持您在源端配置字段自定义数据类型,来满足您更复杂的数据结构或业务特定场景。
字段映射支持选择基础模式和转换模式配置映射:
注意
基础模式和转换模式不支持互相切换,模式切换后,将清空现有字段映射中所有配置信息,一旦切换无法撤销,需谨慎操作。
转换模式:
字段映射支持数据转换,您可根据实际业务需求进行配置,将源端采集的数据,事先通过数据转换后,以指定格式输入到目标端数据库中。
转换模式详细操作说明详见4.1 转换模式
在转换模式中,你可依次配置:来源节点、数据转换、目标节点信息:
配置节点 | 说明 |
---|---|
来源节点 | 配置数据来源 Source 节点信息:
配置完成后,单击确认按钮,完成来源节点配置。 |
数据转换 | 单击数据转换右侧添加按钮,选择 SQL 转换方式,配置转换信息和规则:
配置完成后,单击确认按钮,完成数据转换节点配置。SQL 脚本示例详见4.1.2 添加转换节点。 |
目标节点 | 配置目标节点 Sink 信息:
配置完成后,单击确认按钮,完成目标节点配置。 |
基础模式:
您可通过以下三种方式操作字段映射关系:
说明
来源端字段信息支持输入数据库函数和常量配置,单击手动添加按钮,在源表字段中输入需添加的值,并选择函数或常量类型,例如:
PostgreSQL 数据源支持使用脚本模式(DSL)的方式进行配置。
在某些复杂场景下,或当数据源类型暂不支持可视化配置时,您可通过任务脚本的方式,按照统一的 Json 格式,编写 PostgreSQL Reader 和 PostgreSQL Writer 参数脚本代码,来运行数据集成任务。
进入 DSL 模式操作流程,可详见 MySQL 数据源-4.4.1 进入DSL 模式。
进入 DSL 模式编辑界面后,您可根据实际情况替换相应参数,PostgreSQL 批式读脚本示例如下:
// 变量使用规则如下: // 1.自定义参数变量: {{}}, 比如{{number}} // 2.系统时间变量${}, 比如 ${date}、${hour} // ************************************** { // [required] dsl version, suggest to use latest version "version": "0.2", // [required] execution mode, supoort streaming / batch now "type": "batch", // reader config "reader": { // [required] datasource type "type": "pg", // [optional] datasource id, set it if you have registered datasource "datasource_id": 12345, // [required] user parameter "parameter": { // ********** please write here ********** // "key" : value "columns": [ { "name": "name_sample", "type": "type_sample" } ], "filter": "id > 10", "split_pk": "split_pk_sample", "table_schema":"table_schema", "table_name": "table_name_sample" } }, // writer config "writer": { }, // common config "common": { // [required] user parameter "parameter": { // ********** please write here ********** // "key" : value } } }
Reader 参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:
参数名 | 描述 | 默认值 |
---|---|---|
*type | 数据源类型,对于 PostgreSQL 类型,填写:pg | 无 |
*datasource_id | 注册的 PostgreSQL 数据源 ID。可以在项目控制台 > 数据源管理界面中查找。 | 无 |
*table_schema | 填写 PostgreSQL 数据库中的 Schema 名称。 | 无 |
*table_name | 需要同步的数据表名称,目前单个任务只支持将单表的数据采集到一个目标表中。 | 无 |
filter | 同步数据的筛选条件,同步数据时只会同步符合过滤条件的数据,直接填写关键词 where 后的过滤 SQL 语句。
| 无 |
split_pk | 根据配置的字段进行数据分片,建议使用主键或有索引的列作为切分键,同步任务会启动并发任务进行数据同步,提高同步速率:
说明 目前仅支持类型为整型或字符串的字段作为切分建。 | 无 |
*columns | 所配置的表中,需要同步的列名集合,使用 JSON 的数组描述字段信息。
| 无 |
根据实际情况替换 PostgreSQL 批式写相应参数,PostgreSQL 批式写脚本示例如下:
// ************************************** // 变量使用规则如下: // 1.自定义参数变量: {{}}, 比如{{number}} // 2.系统时间变量${}, 比如 ${date}、${hour} // ************************************** { // [required] dsl version, suggest to use latest version "version": "0.2", // [required] execution mode, supoort streaming / batch now "type": "batch", // reader config "reader": { // [required] datasource type "type": "xx", // [optional] datasource id, set it if you have registered datasource "datasource_id": null, // [required] user parameter "parameter": { // ********** please write here ********** // "key" : value } }, // writer config "writer": { // [required] datasource type "type": "pg", // [optional] datasource id, set it if you have registered datasource "datasource_id": 12345, // [required] user parameter "parameter": { // ********** please write here ********** // "key" : value "table_schema":"table_schema", "table_name":"table_1", "pre_sql_list":[""], "post_sql_list":[""], "write_mode":"directlyInsert", "columns": [ { "name": "name_sample", "type": "type_sample" } ] } }, // common config "common": { // [required] user parameter "parameter": { // ********** please write here ********** // "key" : value } } }
Writer 参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:
参数名 | 描述 | 默认值 |
---|---|---|
*type | 数据源类型,对于 PostgreSQL 类型,填写:pg | 无 |
*datasource_id | 注册的 PostgreSQL 数据源 ID。可以在项目控制台 > 数据源管理界面中查找。 | 无 |
*table_schema | 填写 PostgreSQL 数据库中的 Schema 名称。 | |
*table_name | 填写需要同步的数据表名称,一个数据集成任务只能同步数据到一张目标表。 | 无 |
pre_sql_list | 写入前准备语句:在执行数据集成任务前,率先执行的 SQL 语句。此语句通常是为了使任务重跑时支持幂等。 说明 DSL 模式支持配置多条写入前准备语句,多条语句之间用英文逗号分隔。 | 无 |
post_sql_list | 写入后准备语句:执行数据同步任务后执行的 SQL 语句。例如数据写入完成后,插入某条特殊的数据,标志导入任务执行结束。 说明 DSL 模式支持配置多条写入后准备语句,多条语句之间用英文逗号分隔。 | 无 |
*write_mode | 数据导入模式,支持 insert into 模式:
| 无 |
*columns | 所配置的表中需要同步的列名集合,使用 JSON 的数组描述字段信息。
注意
| 无 |
job.reader.
前缀,写参数需要加上 job.writer.
前缀,如下图所示:reader.parameter
下,写参数请配置到 writer.parameter
下,直接输入参数名称和参数值。如下图所示:参数名 | 描述 | 默认值 |
---|---|---|
init_sql | 读取数据前执行的 SQL 语句。对于视图的查询可能需要使用 init SQL 语句初始化环境 | 无 |
reader_fetch_size | 每次拉取的数据条数,只在准确分片中有效。 | 10000 |
shard_split_mode | 分片模式,支持准确分片、并发分片、不分片三种模式:
| 准确分片 |
customized_sql | 自定义查询读取 SQL 语句。filter 过滤配置项不足以描述所筛选的条件,可通过该配置项来自定义执行较复杂的查询 SQL。 说明 配置该高级参数项后,数据同步任务仍需配置 table_name、column 、split_pk 、shard_split_mode 等必填配置项。然而,在执行同步时,系统将忽略这些配置项信息,直接使用该高级参数项中配置的内容进行数据查询和筛选。 | 无 |
参数名 | 描述 | 默认值 |
---|---|---|
is_insert_ignore | insert into 模式时,主键或者唯一键冲突时任务失败还是忽略冲突 | false |
write_batch_interval | 一次性批量提交的数据条数,该值可以减少与 PostgreSQL 网络的交互次数并提升整体吞吐量。如果该值设置过大可能会导致数据同步进程 OOM。 | 100 |
write_retry_times | PostgreSQL 写入失败时重试次数。 | 3 |
retry_interval_seconds | 写入失败后两次重试的时间间隔,单位秒 | write_batch_interval / 10 |