HBase 连接器提供了对分布式 HBase 数据库表的读写数据能力,支持做数据源表、结果表和维表。
Flink 目前提供了 HBase-1.4 和 HBase-2.2 两种连接器,请根据实际情况选择:
在公网环境中连接火山 HBase 时,您需要添加以下两个参数:
'properties.zookeeper.znode.metaserver' = 'public-meta-region-server'
'properties.zookeeper.znode.master' = 'public-master'
CREATE TABLE hbase_source ( rowkey INT, --行键 family1 ROW<q1 INT>, --声明列族、列和列名 family2 ROW<q2 STRING, q3 BIGINT>, family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 VARCHAR>, PRIMARY KEY (rowkey) NOT ENFORCED --行键定义为主键 ) WITH ( 'connector' = 'hbase-1.4', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>' );
DDL 定义语句介绍:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | (none) | String | 指定使用的连接器。
|
table-name | 是 | (none) | String | 指定读取数据的 HBase 表名。 |
zookeeper.quorum | 是 | (none) | String | HBase 的 zookeeper 地址。 |
zookeeper.znode.parent | 否 | /hbase | String | HBase 在 zookeeper 中的根目录。 |
null-string-literal | 否 | null | String | HBase 字段类型为字符串时,如果 Flink 字段数据为 null,则将该字段赋值为 |
properties.* | 否 | (none) | String | 传递给 HBase 的配置参数,如需了解具体的参数,请参见HBase Default Configuration。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
sink.buffer-flush.max-size | 否 | 2mb | MemorySize | 写入 HBase 前,内存中缓存的数据量大小。调大该值有利于提高 HBase 的写入性能,但会增加写入延迟和内存使用。 |
sink.buffer-flush.max-rows | 否 | 1000 | Integer | 写入 HBase 前,内存中缓存的数据条数。调大该值有利于提高 HBase 的写入性能,但会增加写入延迟和内存使用。 |
sink.buffer-flush.interval | 否 | 1s | Duration | 将缓存数据写入到 HBase 的时间周期间隔,可以控制写入 HBase 的延迟。 |
sink.parallelism | 否 | (none) | Integer | 为 HBase sink 定义并行度。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
lookup.cache | 否 | NONE | Enum | 维表的缓存策略。
|
lookup.partial-cache.max-rows | 否 | (none) | Long | 写入缓存的最大行数,超过此值,最旧的行将过期。 |
lookup.partial-cache.expire-after-write | 否 | (none) | Duration | 写入缓存后,每行在缓存中的最大存活时间。 |
lookup.partial-cache.expire-after-access | 否 | (none) | Duration | 访问缓存后,每行在缓存中的最大存活时间。 |
lookup.partial-cache.cache-missing-key | 否 | true | Boolean | 如果主键与表中所有行都不匹配,是否将继续缓存不存在的键。 |
lookup.max-retries | 否 | 3 | Integer | 查找数据库失败时的最大重试次数。 |
HBase 连接器通过org.apache.hadoop.hbase.util.Bytes
提供的实用程序类将 Flink 数据类型转换为字节数组,然后在 Hbase 存储。读取时,把字节数组转为具体类型的对象。
转换方式如下:
Flink 字段类型 | Hbase 转换 |
---|---|
CHAR / VARCHAR / STRING | byte[] toBytes(String s) |
BOOLEAN | byte[] toBytes(boolean b) |
BINARY / VARBINARY | byte[] |
DECIMAL | byte[] toBytes(BigDecimal v) |
TINYINT | new byte[] { val } |
SMALLINT | byte[] toBytes(short val) |
INT | byte[] toBytes(int val) |
BIGINT | byte[] toBytes(long val) |
FLOAT | byte[] toBytes(float val) |
DOUBLE | byte[] toBytes(double val) |
DATE | 将日期转换成自 1970.01.01 以来的天数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组。 |
TIME | 将时间转换成自 00:00:00 以来的毫秒数,用 int 表示,并通过 byte[] toBytes(int val) 转换成字节数组。 |
TIMESTAMP | 将时间戳转换成自 1970-01-01 00:00:00 以来的毫秒数,用 long 表示,并通过 byte[] toBytes(long val) 转换成字节数组。 |
ARRAY | 不支持。 |
MAP / MULTISET | 不支持。 |
ROW | 不支持。 |
结果表
CREATE TABLE datagen_source ( order_id bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_status.length' = '3', 'fields.order_id.min' = '1', 'fields.order_id.max' = '1000', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); CREATE TABLE hbase_sink ( order_id bigint, order_info ROW ( order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time TIMESTAMP(0) ) ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'orders', 'zookeeper.quorum' = 'hb-a***929-zk.config.ivolces.com:2181' ); insert into hbase_sink select order_id, ROW ( order_product_id, order_customer_id, order_status, order_update_time ) as order_info from datagen_source;
维表
CREATE TABLE orders ( rowkey bigint, order_product_id bigint, order_customer_id bigint, order_status varchar, order_update_time as localtimestamp, proc_time as PROCTIME () ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_status.length' = '3', 'fields.order_product_id.min' = '1', 'fields.order_product_id.max' = '100', 'fields.order_customer_id.min' = '1', 'fields.order_customer_id.max' = '1000' ); CREATE TABLE hbase_sink ( rowkey bigint, cf1 ROW ( order_product_id1 bigint, order_customer_id1 bigint, order_status1 varchar, order_update_time1 TIMESTAMP(0) ) ) WITH ( 'connector' = 'hbase-1.4', 'table-name' = 'orders', 'zookeeper.quorum' = 'hb-a***929-zk.config.ivolces.com:2181' ); create table print_sink ( order_product_id bigint, order_customer_id bigint, order_status varchar, order_product_id1 bigint, order_customer_id1 bigint, order_status1 varchar ) with ('connector' = 'print'); insert into print_sink SELECT orders.order_product_id, orders.order_customer_id, orders.order_status, hbase_sink.order_product_id1, hbase_sink.order_customer_id1, hbase_sink.order_status1 FROM orders LEFT JOIN hbase_sink FOR SYSTEM_TIME AS OF orders.proc_time ON orders.rowkey = hbase_sink.rowkey;