Redis 连接器提供了对 Redis 缓存数据库的写入能力,支持做数据结果表和维表。
使用 Redis 连接器做数据结果表和维表时,有不同的扩展优势:
类型 | 优势 |
---|---|
结果表 |
|
维表 |
|
CREATE TABLE redis_sink ( key VARCHAR PRIMARY KEY NOT ENFORCED, val VARCHAR ) WITH ( 'connector' = 'redis', 'redis-mode' = 'single-node', 'host' = '172.0.0.1', 'port' = '6379', 'value-type' = 'string' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 Redis 连接器。 |
value-type | 否 | string | Enum | Redis 数据库支持的数据类型。取值如下:
|
format | 否 | (none) | String | 数据类型为 String 的时候,序列化和反序列化的格式,取值为:json、pb 等。 说明 如果配置 format ,则无需配置 value-type。 |
redis-mode | 是 | single-node | Enum | 连接 Redis 数据库的模式,可选类型:
|
host | (单点模式必须) | 172.0.0.1 | String | 单点模式下的 Redis Server 地址,如 |
port | (单点模式必须) | 6379 | String | 单点模式下的 Redis Server 连接端口,默认值为6379。 |
cluster.nodes | (集群模式必须) | (none) | String | 集群模式的 host 列表,为分号分隔的 host 和 ip 列表,host 和 ip 之间用分号(;)间隔。例如:“127.0..:6379;10.133..:6379;10.248..:6379” |
master.name | (sentinel模式必须) | (none) | String | sentinel 模式的 mater 名称。 |
sentinels.info | (sentinel模式必须) | (none) | String | sentinel 模式的 sentinel 信息,分号分隔的列表。 |
db | 否 | 0 | Integer | 连接的 Redis 数据库。 |
password | 否 | (none) | String | Redis 数据库登录密码。默认值为空,表示不进行权限验证。 |
connection.timeout | 否 | 2 seconds | Duration | 连接超时时间。 |
connection.socket.timeout | 否 | 2 seconds | Duration | socket 超时时间。 |
connection.max-retries | 否 | 5 | Integer | 连接失败最大重试次数。 |
connection.max-total-num | 否 | 5 | Integer | 最大连接数。 |
connection.max-idle-num | 否 | 5 | Integer | 最大空闲连接数。 |
connection.min-idle-num | 否 | 5 | Integer | 最小空闲连接数。 |
rate-limit-num | 否 | (none) | Integer | 作业的限速配置。 说明 限速会在各个 subtask 之间平均分配额度,考虑到数据均衡和 quota 计算整除,限速一定不要设置太小。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
sink.mode | 否 | insert | Enum | 数据写入模式。取值如下:
说明 incr 增量写入模式用于支持 String 和 Hash 的 incrby 相关场景使用,其他场景写入请使用默认值。 |
sink.record.ttl | 否 | (none) | Duration | 写入数据的过期时间,默认为永久保存。
说明 设置时如果不写单位,则默认为毫秒(milliseconds);单位不区分大小写,数字与单位间的空格可省略。 |
sink.max-retries | 否 | 5 | Integer | 写入失败的最大重试次数。 |
value.format.skip-key | 否 | true | Boolean | 序列化写入的时候 value 中是否包含主键。
|
sink.ignore-delete | 否 | true | Boolean | 是否过滤掉上游 retract 消息。 |
sink.ignore-null | 否 | false | Boolean | 是否过滤掉 null 值写入。 |
sink.buffer-flush.max-rows | 否 | 50 | Integer | Batch 写入记录数:用于确定在写入 Redis 之前进行攒批操作的最大记录数。当记录数超过此配置值时,会将数据写入 Redis。可设为 0 来禁用此攒批写入功能。 |
sink.buffer-flush.interval | 否 | 2s | Duration | Batch 写入间隔时间:用于确定异步线程向 Redis 写入数据的时间间隔。当达到该间隔时,数据会被写入 Redis。可设为 0 来禁用此定时写入功能。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
lookup.cache.max-rows | 否 | (none) | Integer | 维表缓存的最大数据条数。 说明 缓存默认关闭,如需开启, |
lookup.cache.ttl | 否 | (none) | Duration | 维表缓存的过期时间。 说明 缓存默认关闭,如需开启, |
lookup.max-retries | 否 | 3 | Integer | 维表查询失败的最大重试次数。 |
lookup.cache-null-value | 否 | true | Boolean | 是否缓存 null 数据,仅在开启缓存时才生效。 |
lookup.later-join-retry-times | 否 | 0 | Duration | 延迟 Join 的间隔时间。 |
lookup.later-join-latency | 否 | 1 | Integer | 延迟 Join 的重试次数。 |
lookup.enable-input-keyby | 否 | false | Boolean | 上游数据是否按照 Join 的 key 进行 hash 下发到 Join 算子。 |
create table datagen_source ( key varchar, val varchar ) with ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.key.length' = '2', 'fields.val.length' = '5' ); create table redis_sink ( key varchar PRIMARY KEY NOT ENFORCED, val varchar ) with ( 'connector' = 'redis', 'redis-mode' = 'single-node', 'host' = '172.0.0.1', 'port' = '6379', 'password' = 'Pw**5', 'value-type' = 'string' ); insert into redis_sink select * from datagen_source;
create table datagen_source ( key varchar, val varchar ) with ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.key.length' = '2', 'fields.val.length' = '5' ); create table redis_sink ( key varchar PRIMARY KEY NOT ENFORCED, val varchar ) with ( 'connector' = 'redis', 'redis-mode' = 'cluster', 'cluster.nodes' = 'redis-cnl**ymz.redis.ivolces.com:6379', 'password' = 'Pw**5', 'value-type' = 'string' ); insert into redis_sink select * from datagen_source;
指定 Format
create table redis_tb ( id int primary key not enforced, name varchar, age int, gender boolean, description varchar ) with ( 'connector' = 'redis', 'redis-mode' = 'single-node', 'host' = '172.0.0.1', 'port' = '6379', 'password' = 'Pw**5', 'format' = 'json' )
增量写入
create table redis_tb ( id int primary key not enforced, val long ) with ( 'connector' = 'redis', 'redis-mode' = 'single-node', 'host' = '172.0.0.1', 'port' = '6379', 'password' = 'Pw**5', 'value-type' = 'string', 'sink.mode' = 'incr' )
普通读写
create table redis_tb ( key int primary key not enforced, field1 varchar, field2 int, field3 boolean ) with ( 'connector' = 'redis', 'redis-mode' = 'single-node', 'host' = '172.0.0.1', 'port' = '6379', 'password' = 'Pw**5', 'value-type' = 'hash' )
增量写入
create table redis_tb ( key int primary key not enforced, field1 varchar, val long, ) with ( 'connector' = 'redis', 'redis-mode' = 'single-node', 'host' = '172.0.0.1', 'port' = '6379', 'password' = 'Pw**5', 'value-type' = 'hash', 'sink.mode' = 'incr' )
普通读写
create table redis_tb ( key int primary key not enforced, item varchar, vals array<varchar> ) with ( 'connector' = 'redis', 'redis-mode' = 'single-node', 'host' = '172.0.0.1', 'port' = '6379', 'password' = 'Pw**5', 'value-type' = 'list' )
普通读写
create table redis_tb ( key int primary key not enforced, score double, item varchar, vals array<varchar> ) with ( 'connector' = 'redis', 'redis-mode' = 'single-node', 'host' = '172.0.0.1', 'port' = '6379', 'password' = 'Pw**5', 'value-type' = 'zset' )