You need to enable JavaScript to run this app.
文档中心
流式计算 Flink版

流式计算 Flink版

复制全文
下载 pdf
Connector 参考
HBase
复制全文
下载 pdf
HBase

HBase 连接器提供了对分布式 HBase 数据库表的读写数据能力,支持做数据源表、结果表和维表。

使用限制

Flink 目前提供了 HBase-1.4 和 HBase-2.2 两种连接器,请根据实际情况选择:

  • Flink 1.11-volcano 引擎版本中仅支持使用 HBase-1.4 连接器。
  • Flink 1.16-volcano 引擎版本中支持使用 HBase-1.4 和 HBase-2.2 两种连接器。

注意事项

在公网环境中连接火山 HBase 时,您需要添加以下两个参数:

  • 'properties.zookeeper.znode.metaserver' = 'public-meta-region-server'
  • 'properties.zookeeper.znode.master' = 'public-master'

DDL 定义

CREATE TABLE hbase_source (
    rowkey INT,               --行键
    family1 ROW<q1 INT>,      --声明列族、列和列名
    family2 ROW<q2 STRING, q3 BIGINT>,
    family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 VARCHAR>,
    PRIMARY KEY (rowkey) NOT ENFORCED   --行键定义为主键
 ) WITH (
     'connector' = 'hbase-2.2',
     'table-name'='<yourTableName>',
     'zookeeper.znode.parent'='/hbase/hb-cnxxxxxxxxxxxxxx', -- HBase 必须明确指定相关路径
     'zookeeper.quorum'='<yourZookeeperQuorum>'
 );

DDL 定义语句介绍:

  • 需要声明 HBase 的行键(Row Key)。
  • HBase 的行键需要定义为表的主键(Primary Key),如果没有定义,默认也是行键作为主键。
  • HBase 的列族(Column Family)必须声明为 ROW 类型,列族名即该 ROW 的字段名。例如,定义中声明了 family1、family2、family3 三个列族。
  • HBase 列族中的列(Cloumn)与对应 ROW 中嵌套的每个字段对应,列名即字段名。例如,列族 family2 中的 q2 和 q3 表示两列的字段名。
  • 除了类型为 ROW 的字段外,只能有一个原始类型字段,该字段将被视作 HBase 的行键(Row Key),例如,定义中的 rowkey。

WITH 参数

通用参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器。

  • Flink 1.11-volcano 引擎版本中仅支持使用 HBase-1.4 连接器。
  • Flink 1.16-volcano 引擎版本中支持使用 HBase-1.4 和 HBase-2.2 两种连接器。

table-name

(none)

String

指定读取数据的 HBase 表名。

zookeeper.quorum

(none)

String

HBase 的 zookeeper 地址。

zookeeper.znode.parent

/hbase

String

HBase 在 zookeeper 中的根目录。

注意:如果您使用表格数据库 HBase 版,则必须要填写该字段,并且填入对应的 parent 路径

null-string-literal

null

String

HBase 字段类型为字符串时,如果 Flink 字段数据为 null,则将该字段赋值为null-string-literal,并写入 HBase。

properties.*

(none)

String

传递给 HBase 的配置参数,如需了解具体的参数,请参见HBase Default Configuration
Flink 会将properties.删除,将剩余配置传递给底层 HBase 客户端。
示例:'properties.hbase.security.authentication' = 'kerberos' 配置 Kerberos 认证。

结果表参数

参数

是否必选

默认值

数据类型

描述

sink.buffer-flush.max-size

2mb

MemorySize

写入 HBase 前,内存中缓存的数据量大小。调大该值有利于提高 HBase 的写入性能,但会增加写入延迟和内存使用。
默认值为 2MB,支持字节单位 B、KB、MB 和 GB,不区分大小写。设置为 0 表示不进行缓存。

sink.buffer-flush.max-rows

1000

Integer

写入 HBase 前,内存中缓存的数据条数。调大该值有利于提高 HBase 的写入性能,但会增加写入延迟和内存使用。
默认值为 1000,设置为 0 表示不进行缓存。

sink.buffer-flush.interval

1s

Duration

将缓存数据写入到 HBase 的时间周期间隔,可以控制写入 HBase 的延迟。
默认值为 1 秒,支持时间单位 ms、s、min、h 和 d。设置为 0 表示关闭定期写入。

sink.parallelism

(none)

Integer

为 HBase sink 定义并行度。
默认情况下,并行度由框架决定,与上下游算子的并行度保持一致。

维表参数

参数

是否必选

默认值

数据类型

描述

lookup.cache

NONE

Enum

维表的缓存策略。

  • NONE:无缓存。
  • PARTIAL:缓存维表里的部分数据。源表的每条数据都会触发系统先在 Cache 中查找数据,如果没有找到,则去物理维表中查找。

lookup.partial-cache.max-rows

(none)

Long

写入缓存的最大行数,超过此值,最旧的行将过期。
只有设置为 PARTIAL 缓存策略时才能使用该选项。

lookup.partial-cache.expire-after-write

(none)

Duration

写入缓存后,每行在缓存中的最大存活时间。
只有设置为 PARTIAL 缓存策略时才能使用该选项。

lookup.partial-cache.expire-after-access

(none)

Duration

访问缓存后,每行在缓存中的最大存活时间。
只有设置为 PARTIAL 缓存策略时才能使用该选项。

lookup.partial-cache.cache-missing-key

true

Boolean

如果主键与表中所有行都不匹配,是否将继续缓存不存在的键。
只有设置为 PARTIAL 缓存策略时才能使用该选项。

lookup.max-retries

3

Integer

查找数据库失败时的最大重试次数。

数据类型映射

HBase 连接器通过org.apache.hadoop.hbase.util.Bytes提供的实用程序类将 Flink 数据类型转换为字节数组,然后在 Hbase 存储。读取时,把字节数组转为具体类型的对象。
转换方式如下:

Flink 字段类型

Hbase 转换

CHAR / VARCHAR / STRING

byte[] toBytes(String s)
String toString(byte[] b)

BOOLEAN

byte[] toBytes(boolean b)
boolean toBoolean(byte[] b)

BINARY / VARBINARY

byte[]

DECIMAL

byte[] toBytes(BigDecimal v)
BigDecimal toBigDecimal(byte[] b)

TINYINT

new byte[] { val }
bytes[0]

SMALLINT

byte[] toBytes(short val)
short toShort(byte[] bytes)

INT

byte[] toBytes(int val)
int toInt(byte[] bytes)

BIGINT

byte[] toBytes(long val)
long toLong(byte[] bytes)

FLOAT

byte[] toBytes(float val)
float toFloat(byte[] bytes)

DOUBLE

byte[] toBytes(double val)
double toDouble(byte[] bytes)

DATE

将日期转换成自 1970.01.01 以来的天数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组。

TIME

将时间转换成自 00:00:00 以来的毫秒数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组。

TIMESTAMP

将时间戳转换成自 1970-01-01 00:00:00 以来的毫秒数,用 long 表示,并通过 byte[] toBytes(long val) 转换成字节数组。

ARRAY

不支持。

MAP / MULTISET

不支持。

ROW

不支持。

示例代码

  • 结果表

    CREATE TABLE datagen_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time as localtimestamp
        )
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.order_status.length' = '3',
        'fields.order_id.min' = '1',
        'fields.order_id.max' = '1000',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'        
    );
    
    CREATE TABLE hbase_sink (
        order_id bigint,
        order_info ROW (
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time TIMESTAMP(0)
            )
        )
    WITH (
      'connector' = 'hbase-2.2',
      'table-name' = 'orders',
      'zookeeper.znode.parent'='/hbase/hb-cnxxxxxxxxxxxxxx', -- HBase 云产品必须明确指定相关路径
      'zookeeper.quorum' = 'hb-a***929-zk.config.ivolces.com:2181'
    );
    
    insert into hbase_sink
    select
      order_id,
      ROW (
        order_product_id,
        order_customer_id,
        order_status,
        order_update_time
      ) as order_info
    from
      datagen_source;
    
  • 维表

    CREATE TABLE orders (
        rowkey bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time as localtimestamp,
        proc_time as PROCTIME ()
      )
    WITH
      (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.order_status.length' = '3',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'
      );
    
    
    CREATE TABLE hbase_sink (
        rowkey bigint,
        cf1 ROW (
          order_product_id1 bigint,
          order_customer_id1 bigint,
          order_status1 varchar,
          order_update_time1 TIMESTAMP(0)
        )
      )
    WITH
      (
        'connector' = 'hbase-2.2',
        'table-name' = 'orders',
        'zookeeper.znode.parent'='/hbase/hb-cnxxxxxxxxxxxxxx', -- HBase 云产品必须明确指定相关路径
        'zookeeper.quorum' = 'hb-a***929-zk.config.ivolces.com:2181'
      );
    
    
    create table print_sink (
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_product_id1 bigint,
        order_customer_id1 bigint,
        order_status1 varchar
      )
    with
      ('connector' = 'print');
    
    
    insert into print_sink
    SELECT
      orders.order_product_id,
      orders.order_customer_id,
      orders.order_status,
      hbase_sink.order_product_id1,
      hbase_sink.order_customer_id1,
      hbase_sink.order_status1
    FROM
      orders
      LEFT JOIN hbase_sink FOR SYSTEM_TIME AS OF orders.proc_time ON orders.rowkey = hbase_sink.rowkey;
    
最近更新时间:2025.11.25 16:05:22
这个页面对您有帮助吗?
有用
有用
无用
无用