DataSail 中 Kudu 数据源为您提供读取和写入 Kudu 的双向通道功能,实现不同数据源与 Kudu 数据源之间进行数据传输。
本文为您介绍DataSail的Kudu数据同步的能力支持情况。
DataSail 支持 Kudu 1.14.0 及以上自建的开源版本。
子账号新建数据源时,需要有项目的管理员角色,方可以进行新建数据源操作。各角色对应权限说明,详见:管理成员
目前仅支持可视化离线读取 Kudu 数据和脚本模式(DSL)读取、写入 Kudu 数据。
类型分类 | 数据类型 | 备注 |
---|---|---|
整数类型 | INT8、INT16、INT32、INT64 | |
字符串类型 | String、varchar | |
浮点类型 | FLOAT、DOUBLE、DECIMAL | |
布尔类型 | BOOLEAN | BOOLEAN 别名 BOOL |
日期时间类型 | DATE、unixtime_micros | unixtime_micros 别名 DATETIME |
二进制类型 | BINARY |
新建数据源操作详见配置数据源,下面为您介绍用连接串方式配置 Kudu 数据源信息:
参数 | 说明 |
---|---|
基本配置 | |
数据源类型 | Kudu |
接入方式 | 连接串 |
数据源名称 | 数据源的名称,可自行设置,仅支持中文,英文,数字,“_”,100个字符以内。 |
参数配置 | |
服务列表 | 输入 Kudu 对应的服务地址列表信息,以 IP:port 形式,如 ip1:7510。 |
认证方式 | 下拉选择 Kudu 认证方式,目前仅支持选择“无”认证。 |
Kudu 数据源测试连通性成功后,进入到数据开发界面,开始新建 Kudu 相关通道任务。
新建任务方式详见离线数据同步。
任务创建成功后,您可根据实际场景,配置 Kudu 离线读通道任务。
数据来源选择 Kudu,并完成以下相关参数配置:
其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数。
参数 | 说明 |
---|---|
*数据源类型 | 下拉选择 Kudu 数据源类型。 |
*数据源名称 | 已在数据源管理中注册成功的 Kudu 数据源,下拉可选。 |
*数据表 | 选择需要采集的数据表名称信息。 |
分库分表 | Kudu 支持分库分表形式读取,单击添加分库分表按钮,进行分库分表添加,在下拉框中选择分库数据源与具体分表名称信息,同时支持添加多个分库分表。 说明 配置分库分表,需要所有表的 Schema 信息必须保持一致,否则任务会执行异常。 |
数据过滤 | 可自定义配置全量或增量读取数据的条件表达式,Kudu 数据过滤是一个 Json,需与 Kudu 语法命令保持一致,示例:"["AND", ["{'>'}=", "key", 1000], ["IN", "age", [999, 1001, 1003, 1005, 1007, 1009]], ["NOTNULL","name"]]" |
切分建 | 根据配置的字段进行数据分片,建议使用主键或有索引的列作为切分键:
说明 目前仅支持类型为整型或字符串的字段作为切分建。 |
Kudu 可视化离线写配置方式暂未支持。
数据来源和目标端配置完成后,需要指定来源和目标端的字段映射关系,根据字段映射关系,数据集成任务将源端字段中的数据,写入到目标端对应字段中。
您可通过以下三种方式操作字段映射关系:
自动添加:单击自动添加按钮,根据两端数据表信息,可以自动填充来源和目标的字段信息。
手动添加:单击手动添加按钮,可以手动编辑来源和目标的字段信息,可以逐个添加。
移动\删除字段:您也可以根据需要移动字段映射顺序或删除字段。
Kudu 数据源支持使用脚本模式(DSL)的方式进行配置。
在某些复杂场景下,或当数据源类型暂不支持可视化配置时,您可通过任务脚本的方式,按照统一的 Json 格式,编写 Kudu Reader 和 Kudu Writer 参数脚本代码,来运行数据集成任务。
进入 DSL 模式操作流程,可详见 MySQL 数据源-4.4.1 进入DSL 模式。
进入 DSL 模式编辑界面后,您可根据实际情况替换相应参数,Kudu Reader 脚本示例如下:
{ "version": "0.2", "type": "batch", // reader config "reader": { "type": "kudu", "datasource_id": null, "parameter": { "class": "com.bytedance.bitsail.connector.kudu.source.KuduSource", "kudu_table_name": "sink_test2", "kudu_master_address_list": [ "172.16.0.66:7051" ], "read_mode": "READ_LATEST", "predicates": "[\"AND\", [\">=\", \"key\", 1000], [\"IN\", \"age\", [999, 1001, 1003, 1005, 1007, 1009]], [\"NOTNULL\",\"name\"]]", "columns": [ { "name": "key", "type": "long", "properties": "unique" }, { "name": "age", "type": "int" }, { "name": "name", "type": "string" } ] } }, // writer config "writer": { ... }, "common": { ... } }
Reader 参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:
参数名称 | 参数含义 |
---|---|
*type | 读数据源类型,对于 Kudu 类型,默认固定值填写:kudu |
*datasource_id | 填写注册的 Kudu 数据源ID,可以在项目控制台 > 数据源管理界面中查找。
|
*class | Kudu reader connector type,默认固定值 com.bytedance.bitsail.connector.kudu.source.KuduSource |
*kudu_table_name | 填写需读取的 Kudu 表名。 |
*kudu_master_address_list | Kudu 的服务地址列表信息:ip:port 形式输入。 |
*columns | 所配置的表中,需要同步的列名集合,使用 JSON 的数组描述字段信息。 |
read_mode | 填写 Kudu 表的读取模式,支持设置以下三种读取模式:READ_LATEST、READ_AT_SNAPSHOT、READ_YOUR_WRITES 。 |
predicates | 设置同步任务数据的过滤条件,同步时只同步符合过滤条件的数据。 |
enable_fault_tolerant | scanner 是否开启 fault_tolerant。 |
scan_batch_size_bytes | 设置每次读取的字节数大小。 |
scan_max_count | 设置读取最大条数。 |
scan_timeout_ms | scan操作超时时间 |
scan_split_size_bytes | 对一个tablet,按照主键设置split大小 |
根据实际情况替换 Kudu Writer 相应参数,Kudu Writer 脚本示例如下:
{ "version": "0.2", "type": "batch", "reader": { ... }, // writer config "writer": { "type": "kudu", "datasource_id": null, "parameter": { "class": "com.bytedance.bitsail.connector.kudu.sink.KuduSink", "kudu_table_name": "sink_test2", "kudu_master_address_list": ["172.16.0.66:7051"], "columns": [ { "name": "key", "type": "long", "properties": "unique" }, { "name": "age", "type": "int" }, { "name": "name", "type": "string" } ] } }, // common config "common": { ... } }
Writer 参数说明,其中参数名称前带 * 的为必填参数,名称前未带 * 的为可选填参数:
参数名称 | 参数含义 |
---|---|
*type | 写数据源类型,对于 Kudu 类型,默认固定值填写:kudu |
*datasource_id | 填写注册的 Kudu 数据源ID,可以在项目控制台 > 数据源管理界面中查找。
|
*class | Kudu writer connector type, 默认固定值:com.bytedance.bitsail.connector.kudu.sink.KuduSink |
*kudu_table_name | 填写需写入的 Kudu 表名。 |
*kudu_master_address_list | Kudu 的服务地址列表信息:ip:port 形式输入。 |
*columns | 所配置的表中,需要同步的列名集合,使用 JSON 的数组描述字段信息。 |