Hive 连接器提供对 Hive 数据源的读写能力,通过使用 Hive Catalog,Flink 可以对 Hive 表做统一的批和流处理。这意味着 Flink 可以作为 Hive 批处理引擎的一个性能更好的选择,或者流式写 Hive 表中的数据以支持实时数仓应用。
insert overwrite
。在 Flink SQL 语法中,数据表通过Catalog
.Database
.Table
三段式来表示。对应到 Hive 数据源,Catalog 是 Hive Catalog。
其中流写的 Hive 表,需要设置表参数,参见流写的表级别参数。
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
table.exec.hive.fallback-mapred-reader | 否 | true | Boolean | 设置是否开启向量化读取的参数。
该特性默认开启,如果要禁用,则设置为 false。 |
table.exec.hive.infer-source-parallelism | 否 | true | Boolean | 设置是否开启 Source 并发推断。默认情况下,Flink 会基于文件的数量,以及每个文件中块的数量推断出读取 Hive 的最佳并行度。 说明 该参数会影响当前作业的所有 hive source。 |
table.exec.hive.infer-source-parallelism.max | 否 | 1000 | Integer | 设置 source 算子推断的最大并发度。 说明 该参数会影响当前作业的所有 hive source。 |
table.exec.hive.split-max-size | 否 | 128mb | MemorySize | 设置读 Hive 表时调整数据分片大小。 说明 仅适用于 ORC 格式的 Hive 表。 |
table.exec.hive.file-open-cost | 否 | 4mb | MemorySize | 设置读 Hive 表时,打开一个文件预估的开销,以字节为单位,默认是 4MB。 说明 仅适用于 ORC 格式的 Hive 表。 |
table.exec.hive.calculate-partition-size.thread-num | 否 | 3 | Integer | 设置计算所有分区下的所有文件大小的线程数。 |
table.exec.hive.load-partition-splits.thread-num | 否 | 3 | Integer | 设置 split 切分的线程数。 |
table.exec.hive.sink.statistic-auto-gather.enable | 否 | true | Boolean | 设置是否开启自动收集统计信息。 说明 只有批写模式才支持自动收集统计信息,流写模式目前还不支持自动收集统计信息。 |
table.exec.hive.sink.statistic-auto-gather.thread-num | 否 | 3 | Integer | 设置收集统计信息的线程数。 说明 只有批写模式才支持自动收集统计信息,流写模式目前还不支持自动收集统计信息。 |
流写的参数是表级别的,需要在 Hive 建表时,添加到表参数里。这些参数主要是 Filesystem 的表参数,请参见Filesystem。
除此之外,还有一些参数是流写 Hive 独有的,如下:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
sink.partition-commit.policy.kind | 是 | metastore,success-file | String | 分区提交策略,具体为通知下游某个分区已经写完毕可以被读取了,推荐设置为 metastore,success-file。
|
示例 SQL 中的 Hive Catalog 以 default_hive 为例。
SELECT * FROM default_hive.db.table WHERE `date`='${date}'
-- 插入新数据到非分区表 INSERT INTO default_hive.db.mytable SELECT 'Tom', 25; -- 覆盖写入非分区表 INSERT OVERWRITE default_hive.db.mytable SELECT 'Tom', 25; -- 插入新数据到分区表 INSERT INTO default_hive.db.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25; -- 覆盖写入分区表 INSERT OVERWRITE default_hive.db.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25;
创建 Hive 表,在表属性中添加流写 Hive 相关的参数:
CREATE TABLE db.hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.policy.kind'='metastore,success-file' );
流写 Hive 的 Flink SQL:
CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列声明 watermark。 ) WITH (...); INSERT INTO TABLE default_hive.db.hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;