Elasticsearch 连接器提供了对 Elasticsearch 数据分析引擎的写入能力,仅支持做数据结果表。流式计算 Flink 版支持 Elasticsearch-6 和 Elasticsearch-7 两个版本,部分配置存在差异,请注意区分。
CREATE TABLE elasticsearch_sink ( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourDocumentType>', 'username' ='<yourUsername>', 'password' ='<yourPassword>' );
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器,此处是 Elasticsearch-6 或 Elasticsearch-7 连接器。 |
hosts | 是 | (none) | String | Elasticsearch 主机地址,支持 HTTP 和 HTTPS 协议。 |
index | 是 | (none) | String | 索引目录。支持静态索引和动态索引两种方式。
您还可以使用 {field_name|date_format_string} 将TIMESTAMP、DATE 和 TIME 类型的字段值转换为 date_format_string 指定的格式。 |
document-type | Elasticsearch-6 中必选 | (none) | String | 文档类型。 |
document-id.key-delimiter | 否 | _ | String | 文档 ID 的分隔符,默认为 |
failure-handler | 否 | fail | String | Elasticsearch 请求失败时的故障处理策略。取值如下:
|
sink.flush-on-checkpoint | 否 | true | Boolean | 是否在 Checkpoint 时执行 Flush。 |
sink.bulk-flush.max-actions | 否 | 1000 | Integer | 批量写入的最大条数。 设置为 |
sink.bulk-flush.max-size | 否 | 2mb | MemorySize | 批量写入时的最大缓存量,单位是 MB。设置为 |
sink.bulk-flush.interval | 否 | 1s | Duration | 批量写入时的刷新间隔。 |
sink.bulk-flush.backoff.strategy | 否 | DISABLED | String | 如果由于临时请求错误导致 Flush 操作失败,则可以指定重试策略。取值如下:
|
sink.bulk-flush.backoff.max-retries | 否 | 8 | Integer | 批量写入失败,最大的重试次数。 |
sink.bulk-flush.backoff.delay | 否 | 50ms | Duration | 批量写入失败,重试的时间间隔(对于 CONSTANT 重试策略);也可理解为重试的间隔时间基数(对于 EXPONENTIAL 重试策略)。 |
connection.max-retry-timeout | 否 | (none) | Duration | 重试请求的最大超时时长。 |
connection.path-prefix | 否 | (none) | String | 添加到每个 REST 请求中的前缀字符串,一般不设置。 |
format | 否 | json | String | 指定输出的格式,默认是内置的 |
Elasticsearch-6 结果表
create table orders ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '10000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); create table es_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = 'https://elasticsearch-wu****l8e.escloud.ivolces.com:9200', 'index' = 'test_orders', 'document-type' = 'doc', 'username' = 'admin', 'password' = 'cd****456' ); insert into es_sink select * from orders;
Elasticsearch-7 结果表
create table orders ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '10000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); create table es_sink ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time timestamp ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'https://elasticsearch-wu****l8e.escloud.ivolces.com:9200', 'index' = 'test_orders', 'username' = 'admin', 'password' = 'cd****456' ); insert into es_sink select * from orders;