You need to enable JavaScript to run this app.
导航
HBase
最近更新时间:2024.02.22 17:57:39首次发布时间:2023.09.12 16:22:50

HBase 连接器提供了对分布式 HBase 数据库表的读写数据能力,支持做数据源表、结果表和维表。

使用限制

Flink 目前提供了 HBase-1.4 和 HBase-2.2 两种连接器,请根据实际情况选择:

  • Flink 1.11-volcano 引擎版本中仅支持使用 HBase-1.4 连接器。
  • Flink 1.16-volcano 引擎版本中支持使用 HBase-1.4 和 HBase-2.2 两种连接器。

注意事项

在公网环境中连接火山 HBase 时,您需要添加以下两个参数:

  • 'properties.zookeeper.znode.metaserver' = 'public-meta-region-server'
  • 'properties.zookeeper.znode.master' = 'public-master'

DDL 定义

CREATE TABLE hbase_source (
    rowkey INT,               --行键
    family1 ROW<q1 INT>,      --声明列族、列和列名
    family2 ROW<q2 STRING, q3 BIGINT>,
    family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 VARCHAR>,
    PRIMARY KEY (rowkey) NOT ENFORCED   --行键定义为主键
 ) WITH (
     'connector' = 'hbase-1.4',
     'table-name'='<yourTableName>',
     'zookeeper.quorum'='<yourZookeeperQuorum>'
 );

DDL 定义语句介绍:

  • 需要声明 HBase 的行键(Row Key)。
  • HBase 的行键需要定义为表的主键(Primary Key),如果没有定义,默认也是行键作为主键。
  • HBase 的列族(Column Family)必须声明为 ROW 类型,列族名即该 ROW 的字段名。例如,定义中声明了 family1、family2、family3 三个列族。
  • HBase 列族中的列(Cloumn)与对应 ROW 中嵌套的每个字段对应,列名即字段名。例如,列族 family2 中的 q2 和 q3 表示两列的字段名。
  • 除了类型为 ROW 的字段外,只能有一个原始类型字段,该字段将被视作 HBase 的行键(Row Key),例如,定义中的 rowkey。

WITH 参数

通用参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器。

  • Flink 1.11-volcano 引擎版本中仅支持使用 HBase-1.4 连接器。
  • Flink 1.11-volcano 引擎版本中支持使用 HBase-1.4 和 HBase-2.2 两种连接器。

table-name

(none)

String

指定读取数据的 HBase 表名。

zookeeper.quorum

(none)

String

HBase 的 zookeeper 地址。

zookeeper.znode.parent

/hbase

String

HBase 在 zookeeper 中的根目录。

null-string-literal

null

String

HBase 字段类型为字符串时,如果 Flink 字段数据为 null,则将该字段赋值为null-string-literal,并写入 HBase。

properties.*

(none)

String

传递给 HBase 的配置参数,如需了解具体的参数,请参见HBase Default Configuration
Flink 会将properties.删除,将剩余配置传递给底层 HBase 客户端。
示例:'properties.hbase.security.authentication' = 'kerberos' 配置 Kerberos 认证。

结果表参数

参数

是否必选

默认值

数据类型

描述

sink.buffer-flush.max-size

2mb

MemorySize

写入 HBase 前,内存中缓存的数据量大小。调大该值有利于提高 HBase 的写入性能,但会增加写入延迟和内存使用。
默认值为 2MB,支持字节单位 B、KB、MB 和 GB,不区分大小写。设置为 0 表示不进行缓存。

sink.buffer-flush.max-rows

1000

Integer

写入 HBase 前,内存中缓存的数据条数。调大该值有利于提高 HBase 的写入性能,但会增加写入延迟和内存使用。
默认值为 1000,设置为 0 表示不进行缓存。

sink.buffer-flush.interval

1s

Duration

将缓存数据写入到 HBase 的时间周期间隔,可以控制写入 HBase 的延迟。
默认值为 1 秒,支持时间单位 ms、s、min、h 和 d。设置为 0 表示关闭定期写入。

sink.parallelism

(none)

Integer

为 HBase sink 定义并行度。
默认情况下,并行度由框架决定,与上下游算子的并行度保持一致。

维表参数

参数

是否必选

默认值

数据类型

描述

lookup.cache

NONE

Enum

维表的缓存策略。

  • NONE:无缓存。
  • PARTIAL:缓存维表里的部分数据。源表的每条数据都会触发系统先在 Cache 中查找数据,如果没有找到,则去物理维表中查找。

lookup.partial-cache.max-rows

(none)

Long

写入缓存的最大行数,超过此值,最旧的行将过期。
只有设置为 PARTIAL 缓存策略时才能使用该选项。

lookup.partial-cache.expire-after-write

(none)

Duration

写入缓存后,每行在缓存中的最大存活时间。
只有设置为 PARTIAL 缓存策略时才能使用该选项。

lookup.partial-cache.expire-after-access

(none)

Duration

访问缓存后,每行在缓存中的最大存活时间。
只有设置为 PARTIAL 缓存策略时才能使用该选项。

lookup.partial-cache.cache-missing-key

true

Boolean

如果主键与表中所有行都不匹配,是否将继续缓存不存在的键。
只有设置为 PARTIAL 缓存策略时才能使用该选项。

lookup.max-retries

3

Integer

查找数据库失败时的最大重试次数。

数据类型映射

HBase 连接器通过org.apache.hadoop.hbase.util.Bytes提供的实用程序类将 Flink 数据类型转换为字节数组,然后在 Hbase 存储。读取时,把字节数组转为具体类型的对象。
转换方式如下:

Flink 字段类型

Hbase 转换

CHAR / VARCHAR / STRING

byte[] toBytes(String s)
String toString(byte[] b)

BOOLEAN

byte[] toBytes(boolean b)
boolean toBoolean(byte[] b)

BINARY / VARBINARY

byte[]

DECIMAL

byte[] toBytes(BigDecimal v)
BigDecimal toBigDecimal(byte[] b)

TINYINT

new byte[] { val }
bytes[0]

SMALLINT

byte[] toBytes(short val)
short toShort(byte[] bytes)

INT

byte[] toBytes(int val)
int toInt(byte[] bytes)

BIGINT

byte[] toBytes(long val)
long toLong(byte[] bytes)

FLOAT

byte[] toBytes(float val)
float toFloat(byte[] bytes)

DOUBLE

byte[] toBytes(double val)
double toDouble(byte[] bytes)

DATE

将日期转换成自 1970.01.01 以来的天数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组。

TIME

将时间转换成自 00:00:00 以来的毫秒数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组。

TIMESTAMP

将时间戳转换成自 1970-01-01 00:00:00 以来的毫秒数,用 long 表示,并通过 byte[] toBytes(long val) 转换成字节数组。

ARRAY

不支持。

MAP / MULTISET

不支持。

ROW

不支持。

示例代码

  • 结果表

    CREATE TABLE datagen_source (
         order_id bigint,
         order_product_id bigint,
         order_customer_id bigint,
         order_status varchar,
         order_update_time as localtimestamp
        )
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.order_status.length' = '3',
        'fields.order_id.min' = '1',
        'fields.order_id.max' = '1000',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'        
    );
    
    CREATE TABLE hbase_sink (
        order_id bigint,
        order_info ROW (
          order_product_id bigint,
          order_customer_id bigint,
          order_status varchar,
          order_update_time TIMESTAMP(0)
            )
        )
    WITH (
      'connector' = 'hbase-1.4',
      'table-name' = 'orders',
      'zookeeper.quorum' = 'hb-a***929-zk.config.ivolces.com:2181'
    );
    
    insert into hbase_sink
    select
      order_id,
      ROW (
        order_product_id,
        order_customer_id,
        order_status,
        order_update_time
      ) as order_info
    from
      datagen_source;
    
  • 维表

    CREATE TABLE orders (
        rowkey bigint,
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_update_time as localtimestamp,
        proc_time as PROCTIME ()
      )
    WITH
      (
        'connector' = 'datagen',
        'rows-per-second' = '1',
        'fields.order_status.length' = '3',
        'fields.order_product_id.min' = '1',
        'fields.order_product_id.max' = '100',
        'fields.order_customer_id.min' = '1',
        'fields.order_customer_id.max' = '1000'
      );
    
    
    CREATE TABLE hbase_sink (
        rowkey bigint,
        cf1 ROW (
          order_product_id1 bigint,
          order_customer_id1 bigint,
          order_status1 varchar,
          order_update_time1 TIMESTAMP(0)
        )
      )
    WITH
      (
        'connector' = 'hbase-1.4',
        'table-name' = 'orders',
        'zookeeper.quorum' = 'hb-a***929-zk.config.ivolces.com:2181'
      );
    
    
    create table print_sink (
        order_product_id bigint,
        order_customer_id bigint,
        order_status varchar,
        order_product_id1 bigint,
        order_customer_id1 bigint,
        order_status1 varchar
      )
    with
      ('connector' = 'print');
    
    
    insert into print_sink
    SELECT
      orders.order_product_id,
      orders.order_customer_id,
      orders.order_status,
      hbase_sink.order_product_id1,
      hbase_sink.order_customer_id1,
      hbase_sink.order_status1
    FROM
      orders
      LEFT JOIN hbase_sink FOR SYSTEM_TIME AS OF orders.proc_time ON orders.rowkey = hbase_sink.rowkey;