LAS Catalog 通过提供的统一元数据管理、数据访问控制、元数据发现和数据集成等关键功能。LAS Catalog 连接器提供对 LAS Catalog 表的读写能力。
在 LAS Catalog 平台上创建数据目录、数据库和数据表,详细参见 LAS Catalog 文档。另外要确保 LAS Catalog 为 Flink 用户开通合理的权限。
前置条件:因为 Flink 同步 LAS 元数据,需要通过 API 接口访问。需要
在 Flink 里创建 LAS Catalog Catalog,当前 LAS Catalog Catalog 是基于 Hive Catalog 扩展实现的,在使用上需要传递 LAS Catalog 所需的参数,以创建对应的 HMS client。具体创建 Catalog 的语句示例如下:
CREATE CATALOG lf_catalog WITH ( 'type' = 'hive', 'is-lf' = 'true', 'hive-version' = '3.1.3-with-lf3', -- 以下根据区域进行调整 'hive.client.las.region.name' = 'cn-beijing', 'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869', 'hive.hms.client.is.public.cloud' = 'true', -- 填写相关子账号 AK/SK,需要在控制台创建 'hive.client.las.ak' = 'xxx', 'hive.client.las.sk' = 'xxx' );
Catalog 各参数解释如下:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
type | 是 | hive | String | Catalog 类型是 Hive。 |
is-lf | 是 | false | Boolean | 设置为 true,表示访问 LAS Catalog 元信息;设置为 false,表示访问开源 HMS。 |
hive-version | 是 | 无 | String | 访问 LAS Catalog 元信息时,需要设置为 3.1.3-with-lf3。 |
hive.client.las.region.name | 是 | 无 | String | LAS Catalog 所在的区域,如下:
|
hive.metastore.uris | 是 | 无 | String | LAS Catalog 的 metastore 地址,和所在的区域一一对应:
|
hive.hms.client.is.public.cloud | 是 | false | Boolean | 设置为 true,表示访问 LAS Catalog 元信息;设置为 false,表示访问开源 HMS。 |
hive.client.las.ak | 是 | 无 | String | 火山账号的 access key |
hive.client.las.sk | 是 | 无 | String | 火山账号的 secret key |
创建 Catalog 之后,在 Flink SQL 中,数据表通过Catalog
.Database
.Table
三段式来表示,可以直接进行和 Hive 表一样的读写操作,包括批读、批写和流写三种模式。
-- 创建 LAS Catalog Catalog CREATE CATALOG lf_catalog WITH ( 'type' = 'hive', 'is-lf' = 'true', 'hive-version' = '3.1.3-with-lf3', 'hive.client.las.region.name' = 'cn-beijing', 'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869', 'hive.hms.client.is.public.cloud' = 'true', 'hive.client.las.ak' = 'xxx', 'hive.client.las.sk' = 'xxx' ); CREATE TABLE print_sink ( a STRING, b INT, c DOUBLE, d BOOLEAN, `day` STRING, `hour` STRING ) WITH ( 'connector' = 'print', 'print-identifier' = 'out' ); -- 批读 LAS Catalog 表 INSERT INTO print_sink SELECT * FROM lf_catalog.lf_db_test.parquet_partition_table;
-- 创建 LAS Catalog Catalog CREATE CATALOG lf_catalog WITH ( 'type' = 'hive', 'is-lf' = 'true', 'hive-version' = '3.1.3-with-lf3', 'hive.client.las.region.name' = 'cn-beijing', 'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869', 'hive.hms.client.is.public.cloud' = 'true', 'hive.client.las.ak' = 'xxx', 'hive.client.las.sk' = 'xxx' ); -- 插入新数据到非分区表 INSERT INTO lf_catalog.lf_db_test.mytable SELECT 'Tom', 25; -- 覆盖写入非分区表 INSERT OVERWRITE lf_catalog.lf_db_test.mytable SELECT 'Tom', 25; -- 插入新数据到分区表 INSERT INTO lf_catalog.lf_db_test.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25; -- 覆盖写入分区表 INSERT OVERWRITE lf_catalog.lf_db_test.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25;
-- 创建 LAS Catalog Catalog CREATE CATALOG lf_catalog WITH ( 'type' = 'hive', 'is-lf' = 'true', 'hive-version' = '3.1.3-with-lf3', 'hive.client.las.region.name' = 'cn-beijing', 'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869', 'hive.hms.client.is.public.cloud' = 'true', 'hive.client.las.ak' = 'xxx', 'hive.client.las.sk' = 'xxx' ); CREATE TABLE datagen_source ( a STRING, b INT, c DOUBLE, d BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100' ); -- 流写 LAS Catalog 表 INSERT INTO lf_catalog.lf_db_test.parquet_partition_table /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */ SELECT a, b, c, d, cast(current_date as string) as `day`, cast(hour(current_timestamp) as string) as `hour` FROM datagen_source;
Connector Jar 包如下:
下载到本地后,通过 maven install 安装 LAS Catalog Connector 到本地:
mvn install:install-file -Dfile=flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT.jar -DgroupId=org.apache.flink -DartifactId=flink-sql-connector-hive-las-formation-3_2.12 -Dversion=1.16-byted-connector-SNAPSHOT -Dpackaging=jar
该作业为写 LAS Catalog 表的示例作业,主要包含两部分:
Catalog
.Database
.Table
三段式访问。package com.bigdata; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DatagenToLASFormationDemo2 { private static final Logger LOG = LoggerFactory.getLogger(DatagenToLASFormationDemo2.class); public static void main(String[] args) throws Exception { // 填写 LF 相关的参数 String lfRegion = "cn-beijing"; String lfThriftUris = "thrift://lakeformation.las.cn-beijing.ivolces.com:48869"; Boolean lfMetastoreIsPublicCloud = true; String lfAccessKey = "xxx"; String lfSecretKey = "xxx"; String hiveDatabase = "lf_db"; String hiveTable = "lf_table"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings .newInstance() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); // 构造 Elements Source DataStream<String> elementStream = env.fromElements("a", "b", "c"); // 注册 LF Catalog tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); String catalogName = "lf_catalog"; HiveCatalog hiveCatalog = new HiveCatalog(catalogName, hiveDatabase, null, null, "3.1.3-with-lf3", true, lfRegion, lfThriftUris, lfMetastoreIsPublicCloud, lfAccessKey, lfSecretKey); tableEnv.registerCatalog(catalogName, hiveCatalog); // 把上游 DataStream 转为 Table API 的 View,使得可以在 Table API 中访问 tableEnv.createTemporaryView("sourceTable", elementStream); // 通过 Table API 写入到 LF 表 // 注意这里写入时在 LF 表后面通过 option hint 注入了一个动态参数,该参数为分区提交策略,必须设置,比如这里设置为提交到 LF 的 metastore,以及 tos 的 success-file String insertSql = "insert into " + catalogName + "." + hiveDatabase + "." + hiveTable + " /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */ select * from sourceTable"; tableEnv.executeSql(insertSql); } }
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bigdata</groupId> <artifactId>flink-datastream-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.16.2</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-hive-las-formation-3_${scala.binary.version}</artifactId> <version>1.16-byted-connector-SNAPSHOT</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade-flink</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <includes> <include>*:*</include> </includes> </artifactSet> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
要访问 LAS Catalog 的其它数据目录下的表,比如数据目录名称是 other_catalog,则需要在 Flink SQL,在三段式访问 LAS Catalog 表时,把 Database 设置为 other_catalog.database
的方式,示例如下:
SELECT * FROM lf_catalog.`other_catalog.lf_db_test`.parquet_partition_table;