You need to enable JavaScript to run this app.
导航
LAS Catalog
最近更新时间:2025.03.28 15:35:55首次发布时间:2025.03.28 15:35:55
我的收藏
有用
有用
无用
无用

LAS Catalog 通过提供的统一元数据管理、数据访问控制、元数据发现和数据集成等关键功能。LAS Catalog 连接器提供对 LAS Catalog 表的读写能力。

1. 使用限制

  • 需要确保开通 LAS Catalog 统一元数据管理服务,可以参考 LAS Catalog 开通文档
  • LAS Catalog 连接器仅支持在 Flink 1.16-volcano 及以上引擎版本中使用

2. 使用步骤

2.1 LAS 建表

在 LAS Catalog 平台上创建数据目录、数据库和数据表,详细参见 LAS Catalog 文档。另外要确保 LAS Catalog 为 Flink 用户开通合理的权限。

2.2 创建 LAS Catalog Catalog

前置条件:因为 Flink 同步 LAS 元数据,需要通过 API 接口访问。需要

  1. 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret (后续需要填写在 WTIH 参数中)。
  2. 在权限管理中为子账号开通 LASFullAccess 权限。

在 Flink 里创建 LAS Catalog Catalog,当前 LAS Catalog Catalog 是基于 Hive Catalog 扩展实现的,在使用上需要传递 LAS Catalog 所需的参数,以创建对应的 HMS client。具体创建 Catalog 的语句示例如下:

CREATE CATALOG lf_catalog
WITH
  (
    'type' = 'hive',
    'is-lf' = 'true',
    'hive-version' = '3.1.3-with-lf3',
    -- 以下根据区域进行调整 
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    -- 填写相关子账号 AK/SK,需要在控制台创建
    'hive.client.las.ak' = 'xxx',
    'hive.client.las.sk' = 'xxx'
  );

Catalog 各参数解释如下:

参数

是否必选

默认值

数据类型

描述

type

hive

String

Catalog 类型是 Hive。

is-lf

false

Boolean

设置为 true,表示访问 LAS Catalog 元信息;设置为 false,表示访问开源 HMS。

hive-version

String

访问 LAS Catalog 元信息时,需要设置为 3.1.3-with-lf3。

hive.client.las.region.name

String

LAS Catalog 所在的区域,如下:

  • 华北:cn-beijing
  • 华东:cn-shanghai
  • 华南:cn-guangzhou
  • 柔佛:ap-southeast-1

hive.metastore.uris

String

LAS Catalog 的 metastore 地址,和所在的区域一一对应:

  • 华北:thrift://lakeformation.las.cn-beijing.ivolces.com:48869
  • 华东:thrift://lakeformation.las.cn-shanghai.ivolces.com:48869
  • 华南:thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869
  • 柔佛:thrift://lakeformation.las.ap-southeast-1.ivolces.com:48869

hive.hms.client.is.public.cloud

false

Boolean

设置为 true,表示访问 LAS Catalog 元信息;设置为 false,表示访问开源 HMS。

hive.client.las.ak

String

火山账号的 access key

hive.client.las.sk

String

火山账号的 secret key

2.3 读写 LAS Catalog 表

创建 Catalog 之后,在 Flink SQL 中,数据表通过Catalog.Database.Table三段式来表示,可以直接进行和 Hive 表一样的读写操作,包括批读、批写和流写三种模式。

2.3.1 批读

-- 创建 LAS Catalog Catalog
CREATE CATALOG lf_catalog
WITH
  (
    'type' = 'hive',
    'is-lf' = 'true',
    'hive-version' = '3.1.3-with-lf3',
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    'hive.client.las.ak' = 'xxx',
    'hive.client.las.sk' = 'xxx'
  );

CREATE TABLE print_sink (
  a STRING,
  b INT,
  c DOUBLE,
  d BOOLEAN,
  `day` STRING,
  `hour` STRING
) WITH (
     'connector' = 'print',
     'print-identifier' = 'out'            
);

-- 批读 LAS Catalog 表
INSERT INTO print_sink
SELECT * FROM lf_catalog.lf_db_test.parquet_partition_table;

2.3.2 批写

-- 创建 LAS Catalog Catalog
CREATE CATALOG lf_catalog
WITH
  (
    'type' = 'hive',
    'is-lf' = 'true',
    'hive-version' = '3.1.3-with-lf3',
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    'hive.client.las.ak' = 'xxx',
    'hive.client.las.sk' = 'xxx'
  );

-- 插入新数据到非分区表 
INSERT INTO lf_catalog.lf_db_test.mytable SELECT 'Tom', 25; 
-- 覆盖写入非分区表 
INSERT OVERWRITE lf_catalog.lf_db_test.mytable SELECT 'Tom', 25; 
-- 插入新数据到分区表 
INSERT INTO lf_catalog.lf_db_test.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25; 
-- 覆盖写入分区表 
INSERT OVERWRITE lf_catalog.lf_db_test.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25;

2.3.3 流写

-- 创建 LAS Catalog Catalog
CREATE CATALOG lf_catalog
WITH
  (
    'type' = 'hive',
    'is-lf' = 'true',
    'hive-version' = '3.1.3-with-lf3',
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    'hive.client.las.ak' = 'xxx',
    'hive.client.las.sk' = 'xxx'
  );

CREATE TABLE datagen_source (
  a STRING,
  b INT,
  c DOUBLE,
  d BOOLEAN
)
WITH
(
  'connector' = 'datagen',
  'rows-per-second' = '100'
);

-- 流写 LAS Catalog 表
INSERT INTO lf_catalog.lf_db_test.parquet_partition_table 
  /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */
SELECT
  a, b, c, d,
  cast(current_date as string) as `day`,
  cast(hour(current_timestamp) as string) as `hour`
FROM
  datagen_source;

3. DataStream Connector 使用步骤

3.1 加载 LAS Catalog Connector 依赖

Connector Jar 包如下:

flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT (4).jar
未知大小

下载到本地后,通过 maven install 安装 LAS Catalog Connector 到本地:

mvn install:install-file -Dfile=flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT.jar -DgroupId=org.apache.flink -DartifactId=flink-sql-connector-hive-las-formation-3_2.12 -Dversion=1.16-byted-connector-SNAPSHOT -Dpackaging=jar

3.2 DataStream 示例

该作业为写 LAS Catalog 表的示例作业,主要包含两部分:

  • 注册 LAS Catalog,需要传入和 SQL Connector 相同的参数,如 lfRegion、lfThriftUris 等,具体参数含义见上述参数表格。
  • 通过 Table API 写入到 LAS 表,其中 LAS 表通过Catalog.Database.Table三段式访问。
package com.bigdata;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatagenToLASFormationDemo2 {
    private static final Logger LOG = LoggerFactory.getLogger(DatagenToLASFormationDemo2.class);

    public static void main(String[] args) throws Exception {
        // 填写 LF 相关的参数
        String lfRegion = "cn-beijing";
        String lfThriftUris = "thrift://lakeformation.las.cn-beijing.ivolces.com:48869";
        Boolean lfMetastoreIsPublicCloud = true;
        String lfAccessKey = "xxx";
        String lfSecretKey = "xxx";
        String hiveDatabase = "lf_db";
        String hiveTable = "lf_table";
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 构造 Elements Source
        DataStream<String> elementStream = env.fromElements("a", "b", "c");

        // 注册 LF Catalog
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        String catalogName = "lf_catalog";
        HiveCatalog hiveCatalog = new HiveCatalog(catalogName, hiveDatabase, null, null, "3.1.3-with-lf3",
                true, lfRegion, lfThriftUris, lfMetastoreIsPublicCloud, lfAccessKey, lfSecretKey);
        tableEnv.registerCatalog(catalogName, hiveCatalog);

        // 把上游 DataStream 转为 Table API 的 View,使得可以在 Table API 中访问
        tableEnv.createTemporaryView("sourceTable", elementStream);

        // 通过 Table API 写入到 LF 表
        // 注意这里写入时在 LF 表后面通过 option hint 注入了一个动态参数,该参数为分区提交策略,必须设置,比如这里设置为提交到 LF 的 metastore,以及 tos 的 success-file
        String insertSql = "insert into " + catalogName + "." + hiveDatabase + "." + hiveTable + " /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */ select * from sourceTable";
        tableEnv.executeSql(insertSql);
    }
}

3.3 依赖的 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.bigdata</groupId>
  <artifactId>flink-datastream-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.16.2</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-hive-las-formation-3_${scala.binary.version}</artifactId>
    <version>1.16-byted-connector-SNAPSHOT</version>
  </dependency>
</dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
          <execution>
            <id>shade-flink</id>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <includes>
                  <include>*:*</include>
                </includes>
              </artifactSet>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>

4. FAQ

4.1 如何查询其它 catalog 的数据

要访问 LAS Catalog 的其它数据目录下的表,比如数据目录名称是 other_catalog,则需要在 Flink SQL,在三段式访问 LAS Catalog 表时,把 Database 设置为 other_catalog.database 的方式,示例如下:

SELECT * FROM lf_catalog.`other_catalog.lf_db_test`.parquet_partition_table;