Doris 连接器提供了 Doris 数据库的读写数据的能力,支持做数据源表和结果表。您可以创建 source 流从 Doris Table 中获取数据,作为作业的输入数据;也可以通过 Doris 结果表将作业输出数据写入到 Doris Table 中。
CREATE TABLE doris_source ( name STRING, score INT ) WITH ( 'connector' = 'doris', 'fenodes' = 'FE_IP:FE_HTTP_PORT', 'table.identifier' = 'test.sales_order', 'username' = 'root', 'password' = 'password' );
CREATE TABLE doris_sink ( name STRING, score INT ) WITH ( 'connector' = 'doris', 'fenodes' = 'FE_IP:FE_HTTP_PORT', 'table.identifier' = 'test.sales_order', 'username' = 'root', 'password' = 'password', 'sink.batch.size' = '500', 'sink.batch.interval' = '1s' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 Doris 连接器。 |
fenodes | 是 | (none) | String | Doris FE 的 HTTP 地址,格式为 |
table.identifier | 是 | (none) | String | 表名,格式为 |
username | 是 | (none) | String | 登录 Doris 数据库的用户名。 |
password | 是 | (none) | String | 登录 Doris 数据库的用户密码。 |
doris.request.retries | 否 | 3 | Integer | 向 Doris 发送请求的重试次数。 |
doris.request.connect.timeout.ms | 否 | 30000 | Duration | 向 Doris 发送请求的连接超时时间。 |
doris.request.read.timeout.ms | 否 | 30000 | Duration | 向 Doris 发送请求的读取超时时间。 |
doris.request.query.timeout.s | 否 | 3600 | Duration | 查询 Doris 的超时时间。设置为 |
doris.request.tablet.size | 否 | Integer. MAX_VALUE | Integer | 一个分区对应的 Doris Tablet 个数。 |
doris.batch.size | 否 | 1024 | Integer | 单次从 BE 读取数据的最大行数。 |
doris.exec.mem.limit | 否 | 2147483648 | Long | 单次查询的内存限制。默认为 2GB,单位为 byte。 |
doris.deserialize.arrow.async | 否 | false | Boolean | 是否支持异步转换 。
|
doris.deserialize.queue.size | 否 | 64 | Integer | 异步转换 Arrow 格式的内部处理队列。 |
doris.read.field | 否 | (none) | String | 读取 Doris 表的列名,多列之间使用英文逗号分隔。 |
doris.filter.query | 否 | (none) | String | 过滤读取数据的表达式。 |
sink.batch.size | 否 | 1000 | Integer | 单次写 BE 的最大行数。 |
sink.max-retries | 否 | 1 | Integer | 写 BE 失败后的最大重试次数。 |
sink.batch.interval | 否 | 1s | Duration | Flush 间隔时间,超过该时间后异步线程将缓存数据写入 BE。 |
CREATE TABLE datagen_source ( siteid INT, citycode SMALLINT, username STRING, pv BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', 'fields.username.length' = '3', 'fields.siteid.min' = '6', 'fields.siteid.max' = '1000', 'fields.citycode.min' = '1', 'fields.citycode.max' = '100', 'fields.pv.min' = '1', 'fields.pv.max' = '1000' ); CREATE TABLE doris_sink ( siteid INT, citycode SMALLINT, username STRING, pv BIGINT ) WITH ( 'connector' = 'doris', 'fenodes' = 'FE_IP:FE_HTTP_PORT', 'table.identifier' = 'DOC.table2', 'username' = 'root', 'password' = 'password' ); insert into doris_sink select * from datagen_source;