You need to enable JavaScript to run this app.
流式计算 Flink版

流式计算 Flink版

复制全文
Catalog 参考
LAS Catalog
复制全文
LAS Catalog

LAS Catalog 通过提供的统一元数据管理、数据访问控制、元数据发现和数据集成等关键功能。LAS Catalog 连接器提供对 LAS Catalog 表的读写能力。

1. 使用限制

  • 需要确保开通 LAS Catalog 统一元数据管理服务,可以参考 LAS Catalog 开通文档
  • LAS Catalog 连接器仅支持在 Flink 1.16-volcano 及以上引擎版本中使用

2. 使用步骤

2.1 LAS 建表

在 LAS Catalog 平台上创建数据目录、数据库和数据表,详细参见 LAS Catalog 文档LAS 建表文档。另外要确保 LAS Catalog 为 Flink 用户开通合理的权限。
注意事项:
LAS 默认的数据目录 hive 的存储位置是 files://,是一个虚拟路径,需要修改为一个实际的 tos:// 路径,否则会由于路径不对,导致写入报错。如下所示:
Image

2.2 创建 LAS Catalog

前置条件:因为 Flink 同步 LAS 元数据,需要通过 API 接口访问。需要

  1. 使用 API 密钥管理为子账号创建 AccessKey / AccessKeySecret (后续需要填写在 WTIH 参数中)。
  2. 在权限管理中为子账号开通 LASFullAccess 权限。

在 Flink 里创建 LAS Catalog,当前 LAS Catalog 是基于 Hive Catalog 扩展实现的,在使用上需要传递 LAS Catalog 所需的参数,以创建对应的 HMS client。具体创建 Catalog 的语句示例如下:

CREATE CATALOG lf_catalog
WITH
  (
    'type' = 'hive',
    'is-lf' = 'true',
    'hive-version' = '3.1.3-with-lf3',
    -- 以下根据区域进行调整 
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    -- 填写相关子账号 AK/SK,需要在控制台创建
    'hive.client.las.ak' = 'xxx',
    'hive.client.las.sk' = 'xxx',
    -- 指定 LAS 数据目录,默认是 hive
    'catalog.properties.metastore.catalog.default' = 'hive'
  );

Catalog 各参数解释如下:

参数

是否必选

默认值

数据类型

描述

type

hive

String

Catalog 类型是 Hive。

is-lf

false

Boolean

设置为 true,表示访问 LAS Catalog 元信息;设置为 false,表示访问开源 HMS。

hive-version

String

访问 LAS Catalog 元信息时,需要设置为 3.1.3-with-lf3。

hive.client.las.region.name

String

LAS Catalog 所在的区域,如下:

  • 华北:cn-beijing
  • 华东:cn-shanghai
  • 华南:cn-guangzhou
  • 柔佛:ap-southeast-1

hive.metastore.uris

String

LAS Catalog 的 metastore 地址,和所在的区域一一对应:

  • 华北:thrift://lakeformation.las.cn-beijing.ivolces.com:48869
  • 华东:thrift://lakeformation.las.cn-shanghai.ivolces.com:48869
  • 华南:thrift://lakeformation.las.cn-guangzhou.ivolces.com:48869
  • 柔佛:thrift://lakeformation.las.ap-southeast-1.ivolces.com:48869

hive.hms.client.is.public.cloud

false

Boolean

设置为 true,表示访问 LAS Catalog 元信息;设置为 false,表示访问开源 HMS。

hive.client.las.ak

String

火山账号的 access key

hive.client.las.sk

String

火山账号的 secret key

catalog.properties.*

String

透传任意的 hive conf 配置项,只要是合法的 hive conf,都可以通过该前缀的方式传入。已有的 hive conf 见官方文档

catalog.properties.metastore.catalog.default

hive

String

设置访问 LAS 元数据的数据目录,默认是 hive,如果要指定其他数据目录,则通过该参数手动指定即可。

2.3 读写 LAS Catalog 表

创建 Catalog 之后,在 Flink SQL 中,数据表通过Catalog.Database.Table三段式来表示,可以直接进行和 Hive 表一样的读写操作,包括批读、批写和流写三种模式。

2.3.1 批读

-- 创建 LAS Catalog
CREATE CATALOG lf_catalog
WITH
  (
    'type' = 'hive',
    'is-lf' = 'true',
    'hive-version' = '3.1.3-with-lf3',
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    'hive.client.las.ak' = 'xxx',
    'hive.client.las.sk' = 'xxx'
  );

CREATE TABLE print_sink (
  a STRING,
  b INT,
  c DOUBLE,
  d BOOLEAN,
  `day` STRING,
  `hour` STRING
) WITH (
     'connector' = 'print',
     'print-identifier' = 'out'            
);

-- 批读 LAS Catalog 表
INSERT INTO print_sink
SELECT * FROM lf_catalog.lf_db_test.parquet_partition_table;

2.3.2 批写

-- 创建 LAS Catalog
CREATE CATALOG lf_catalog
WITH
  (
    'type' = 'hive',
    'is-lf' = 'true',
    'hive-version' = '3.1.3-with-lf3',
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    'hive.client.las.ak' = 'xxx',
    'hive.client.las.sk' = 'xxx'
  );

-- 插入新数据到非分区表 
INSERT INTO lf_catalog.lf_db_test.mytable SELECT 'Tom', 25; 
-- 覆盖写入非分区表 
INSERT OVERWRITE lf_catalog.lf_db_test.mytable SELECT 'Tom', 25; 
-- 插入新数据到分区表 
INSERT INTO lf_catalog.lf_db_test.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25; 
-- 覆盖写入分区表 
INSERT OVERWRITE lf_catalog.lf_db_test.myparttable PARTITION (`date`='${date}') SELECT 'Tom', 25;

2.3.3 流写

-- 创建 LAS Catalog
CREATE CATALOG lf_catalog
WITH
  (
    'type' = 'hive',
    'is-lf' = 'true',
    'hive-version' = '3.1.3-with-lf3',
    'hive.client.las.region.name' = 'cn-beijing',
    'hive.metastore.uris' = 'thrift://lakeformation.las.cn-beijing.ivolces.com:48869',
    'hive.hms.client.is.public.cloud' = 'true',
    'hive.client.las.ak' = 'xxx',
    'hive.client.las.sk' = 'xxx'
  );

CREATE TABLE datagen_source (
  a STRING,
  b INT,
  c DOUBLE,
  d BOOLEAN
)
WITH
(
  'connector' = 'datagen',
  'rows-per-second' = '100'
);

-- 流写 LAS Catalog 表
INSERT INTO lf_catalog.lf_db_test.parquet_partition_table 
  /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */
SELECT
  a, b, c, d,
  cast(current_date as string) as `day`,
  cast(hour(current_timestamp) as string) as `hour`
FROM
  datagen_source;

3. DataStream Connector 使用步骤

3.1 加载 LAS Catalog Connector 依赖

Connector Jar 包如下:

flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT (4).jar
未知大小

下载到本地后,通过 maven install 安装 LAS Catalog Connector 到本地:

mvn install:install-file -Dfile=flink-sql-connector-hive-las-formation-3_2.12-1.16-byted-connector-SNAPSHOT.jar -DgroupId=org.apache.flink -DartifactId=flink-sql-connector-hive-las-formation-3_2.12 -Dversion=1.16-byted-connector-SNAPSHOT -Dpackaging=jar

3.2 DataStream 示例

该作业为写 LAS Catalog 表的示例作业,主要包含两部分:

  • 注册 LAS Catalog,需要传入和 SQL Connector 相同的参数,如 lfRegion、lfThriftUris 等,具体参数含义见上述参数表格。
  • 通过 Table API 写入到 LAS 表,其中 LAS 表通过Catalog.Database.Table三段式访问。
package com.bigdata;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatagenToLASFormationDemo2 {
    private static final Logger LOG = LoggerFactory.getLogger(DatagenToLASFormationDemo2.class);

    public static void main(String[] args) throws Exception {
        // 填写 LF 相关的参数
        String lfRegion = "cn-beijing";
        String lfThriftUris = "thrift://lakeformation.las.cn-beijing.ivolces.com:48869";
        Boolean lfMetastoreIsPublicCloud = true;
        String lfAccessKey = "xxx";
        String lfSecretKey = "xxx";
        String hiveDatabase = "lf_db";
        String hiveTable = "lf_table";
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

        // 构造 Elements Source
        DataStream<String> elementStream = env.fromElements("a", "b", "c");

        // 注册 LF Catalog
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        String catalogName = "lf_catalog";
        HiveCatalog hiveCatalog = new HiveCatalog(catalogName, hiveDatabase, null, null, "3.1.3-with-lf3",
                true, lfRegion, lfThriftUris, lfMetastoreIsPublicCloud, lfAccessKey, lfSecretKey);
        tableEnv.registerCatalog(catalogName, hiveCatalog);

        // 把上游 DataStream 转为 Table API 的 View,使得可以在 Table API 中访问
        tableEnv.createTemporaryView("sourceTable", elementStream);

        // 通过 Table API 写入到 LF 表
        // 注意这里写入时在 LF 表后面通过 option hint 注入了一个动态参数,该参数为分区提交策略,必须设置,比如这里设置为提交到 LF 的 metastore,以及 tos 的 success-file
        String insertSql = "insert into " + catalogName + "." + hiveDatabase + "." + hiveTable + " /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */ select * from sourceTable";
        tableEnv.executeSql(insertSql);
    }
}

3.3 依赖的 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.bigdata</groupId>
  <artifactId>flink-datastream-demo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.16.2</flink.version>
    <scala.binary.version>2.12</scala.binary.version>
  </properties>

<dependencies>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-hive-las-formation-3_${scala.binary.version}</artifactId>
    <version>1.16-byted-connector-SNAPSHOT</version>
  </dependency>
</dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <executions>
          <execution>
            <id>shade-flink</id>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <includes>
                  <include>*:*</include>
                </includes>
              </artifactSet>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>

4. FAQ

现象是 Flink 写 LAS 作业不报错,但是无法写入数据。该情况可能是 LAS 建表的存储路径设置有问题。
排查步骤:

  1. 检查 LAS 建表的存储路径是否设置正确。需要进入 AI 数据湖服务页面,查看该表的信息:

Image

  1. 检查 Flink 写入 LAS 的数据是否写入到 LAS 建表的存储路径。点击上述存储位置,跳转到 TOS 页面,查看目录下是否有分区和数据文件:

Image

最近更新时间:2025.09.04 11:30:15
这个页面对您有帮助吗?
有用
有用
无用
无用