You need to enable JavaScript to run this app.
导航
Hive Catalog
最近更新时间:2025.03.28 15:32:06首次发布时间:2025.03.28 15:32:06
我的收藏
有用
有用
无用
无用

Hive 连接器提供对 Hive 数据源的读写能力,通过使用 Hive Catalog,Flink 可以对 Hive 表做统一的批和流处理。这意味着 Flink 可以作为 Hive 批处理引擎的一个性能更好的选择,或者流式写 Hive 表中的数据以支持实时数仓应用。

使用限制

  • Hive 连接器仅支持在 Flink 1.16-volcano 及以上引擎版本中使用。
  • 流写 Hive 不支持 insert overwrite

建表方式

在 Flink SQL 语法中,数据表通过Catalog.Database.Table三段式来表示。对应到 Hive 数据源,Catalog 是 Hive Catalog。
其中流写的 Hive 表,需要设置表参数,参见流写的表级别参数

参数

创建 Catalog 的参数

参数

必选

默认值

类型

描述

type

(无)

String

Catalog 的类型。 创建 HiveCatalog 时,该参数必须设置为'hive'

name

(无)

String

Catalog 的名字。仅在使用 YAML file 时需要指定。

hive-conf-dir

(无)

String

指向包含 hive-site.xml 目录的 URI。 该 URI 必须是 Hadoop 文件系统所支持的类型。 如果指定一个相对 URI,即不包含 scheme,则默认为本地文件系统。如果该参数没有指定,我们会在 class path 下查找hive-site.xml。

说明

如果通过依赖文件上传到 Flink 任务中,这里固定填写 /opt/tiger/workdir

default-database

default

String

当一个catalog被设为当前catalog时,所使用的默认当前database。

hive-version

(无)

String

HiveCatalog 能够自动检测使用的 Hive 版本。我们建议不要手动设置 Hive 版本,除非自动检测机制失败。

hadoop-conf-dir

(无)

String

Hadoop 配置文件目录的路径。目前仅支持本地文件系统路径。我们推荐使用 HADOOP_CONF_DIR 环境变量来指定 Hadoop 配置。因此仅在环境变量不满足您的需求时再考虑使用该参数,例如当您希望为每个 HiveCatalog 单独设置 Hadoop 配置时。

批读&写的作业级别参数

参数

是否必选

默认值

数据类型

描述

table.exec.hive.fallback-mapred-reader

true

Boolean

设置是否开启向量化读取的参数。
当满足以下条件时,Flink 会自动对 Hive 表进行向量化读取:

  • 格式:ORC 或者 Parquet。
  • 没有复杂类型的列,比如 Hive 列类型:List、Map、Struct、Union。

该特性默认开启,如果要禁用,则设置为 false。

table.exec.hive.infer-source-parallelism

true

Boolean

设置是否开启 Source 并发推断。默认情况下,Flink 会基于文件的数量,以及每个文件中块的数量推断出读取 Hive 的最佳并行度。
Flink 允许灵活地配置并发推断策略。
如果该参数是 true,会根据 split 的数量推断 source 的并发度。如果是 false,source 的并发度由配置决定。

说明

该参数会影响当前作业的所有 hive source。

table.exec.hive.infer-source-parallelism.max

1000

Integer

设置 source 算子推断的最大并发度。

说明

该参数会影响当前作业的所有 hive source。

table.exec.hive.split-max-size

128mb

MemorySize

设置读 Hive 表时调整数据分片大小。
读 Hive 表时, 数据文件将会被切分为若干个分片(split), 每一个分片是要读取的数据的一部分。 分片是 Flink 进行任务分配和数据并行读取的基本粒度。 用户可以通过该参数来调整每个分片的大小,来做一定的读性能调优。
该参数表示读 Hive 表时,每个分片最大可以包含的字节数 (默认是 128MB)。

说明

仅适用于 ORC 格式的 Hive 表。

table.exec.hive.file-open-cost

4mb

MemorySize

设置读 Hive 表时,打开一个文件预估的开销,以字节为单位,默认是 4MB。
如果这个值比较大,Flink 将会倾向于将 Hive 表切分为更少的分片,这在 Hive 表中包含大量小文件的时候很有用。 反之,Flink 将会倾向于将 Hive 表切分为更多的分片,这有利于提升数据读取的并行度。

说明

仅适用于 ORC 格式的 Hive 表。

table.exec.hive.calculate-partition-size.thread-num

3

Integer

设置计算所有分区下的所有文件大小的线程数。
为了调整数据分片的大小, Flink 首先将计算得到所有分区下的所有文件的大小,但是这在分区数量很多的情况下会比较耗时。通过调整该参数为一个更大的值使用更多的线程来进行加速。

table.exec.hive.load-partition-splits.thread-num

3

Integer

设置 split 切分的线程数。
Flink 使用多个线程并发将 Hive 分区切分成多个 split 进行读取。通过该参数配置线程数,以加速 split 切分过程。

table.exec.hive.sink.statistic-auto-gather.enable

true

Boolean

设置是否开启自动收集统计信息。
在使用 Flink 写入 Hive 表的时候,Flink 将默认自动收集写入数据的统计信息然后将其提交至 Hive metastore 中。 但在某些情况下,你可能不想自动收集统计信息,因为收集这些统计信息可能会花费一定的时间。 为了避免 Flink 自动收集统计信息,可以设置该参数为 false。

说明

只有批写模式才支持自动收集统计信息,流写模式目前还不支持自动收集统计信息。

table.exec.hive.sink.statistic-auto-gather.thread-num

3

Integer

设置收集统计信息的线程数。
对于 Parquet 或者 ORC 格式的表,为了快速收集到统计信息 numRows/rawDataSize, Flink 只会读取文件的 footer。但是在文件数量很多的情况下,这可能也会比较耗时,您可以通过设置该参数为一个更大的值来加快统计信息的收集。

说明

只有批写模式才支持自动收集统计信息,流写模式目前还不支持自动收集统计信息。

流写的表级别参数

流写的参数是表级别的,需要在 Hive 建表时,添加到表参数里。这些参数主要是 Filesystem 的表参数,请参见Filesystem
除此之外,还有一些参数是流写 Hive 独有的,如下:

参数

是否必选

默认值

数据类型

描述

sink.partition-commit.policy.kind

metastore,success-file

String

分区提交策略,具体为通知下游某个分区已经写完毕可以被读取了,推荐设置为 metastore,success-file。

  • metastore:向 metadata 增加分区。仅 hive 支持 metastore 策略,文件系统通过目录结构管理分区;
  • success-file:在目录中增加 '_success' 文件; 上述两个策略可以同时指定:'metastore,success-file'。
  • custom:通过指定的类来创建提交策略。 支持同时指定多个提交策略:'metastore,success-file'。

示例代码

示例 SQL 中的 Hive Catalog 以 default_hive 为例。

创建 Catalog

CREATE CATALOG default_hive WITH (
    'type' = 'hive',
    -- hive-site.xml 文件的路径
    'hive-conf-dir' = '/opt/tiger/workdir'
);

批读

SELECT * FROM default_hive.db.table WHERE `date`='${date}'

批写

-- 插入新数据到非分区表 
INSERT INTO default_hive.db.mytable SELECT 'Tom', 25; 
-- 覆盖写入非分区表 
INSERT OVERWRITE default_hive.db.mytable SELECT 'Tom', 25; 
-- 插入新数据到分区表 
INSERT INTO default_hive.db.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25; 
-- 覆盖写入分区表 
INSERT OVERWRITE default_hive.db.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25;

流写

创建 Hive 表,在表属性中添加流写 Hive 相关的参数:

CREATE TABLE db.hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 h',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

流写 Hive 的 Flink SQL:

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列声明 watermark。
) WITH (...);

INSERT INTO TABLE default_hive.db.hive_table 
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;