You need to enable JavaScript to run this app.
导航
基础使用
最近更新时间:2024.07.03 16:44:10首次发布时间:2024.03.15 11:00:22

目前火山引擎 E-MapReduce(EMR3.9.0)集群中,Paimon支持的版本为0.6-release。

1 Paimon安装

Paimon 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Paimon 组件可见并且状态是正常的。详见创建集群。 如果在集群初始化时没有安装,也可以通过添加 Paimon 组件在已有集群上添加 Paimon。详见添加服务

2 功能矩阵

引擎

版本

批读

批写

创建表

修改表

流写

流读

批式覆盖写

Flink

1.16.1

Spark

3.5.0

Hive

3.1.3

Presto

0.280

拉起Flink session集群

<FLINK_HOME>/bin/yarn-session.sh -d

使用Flink sql-client执行Flink SQL

<FLINK_HOME>/bin/sql-client.sh

3.1 Catalog

在使用Paimon库表之前,需要为Paimon创建Catalog
目前Paimon支持两种类型的catalog:FileSystem或Hive。
FileSystem
这种catalog会将Paimon库表元数据存储在文件系统上,比如HDFS。
用Flink创建FileSystem类型的Catalog的方式如下:

CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://nn:8020/path/to/warehousee'
);

USE CATALOG my_catalog;

Hive
这种catalog会将Paimon库表元数据存储在Hive metastore当中,用户将可以通过Hive来访问这些Paimon表。

说明

使用Hive metastore作为Catalog时,依赖Flink Hive bunndle,请确保Flink Hive bundle已被添加至Flink相关依赖中。

用Flink创建HMS类型的Catalog的方式如下:

CREATE  CATALOG my_hive  WITH  (
    'type' = 'paimon',
    'metastore' = 'hive',
    -- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
    -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
    -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
    -- 'warehouse' = 'hdfs://nn:8020/path/to/warehouse', 此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。
);

USE CATALOG my_hive;

3.2 创建表

Primary Key表(主键表

CREATE TABLE word_count (
    word STRING PRIMARY KEY NOT ENFORCED,
    cnt BIGINT
);

Append Only表(非主键表)

CREATE TABLE word_count_nonkey (
    word STRING,
    cnt BIGINT
);

3.3 流式写入

-- 首先创建一个流式数据源表
CREATE TEMPORARY TABLE word_table (
    word STRING
) WITH (
     'connector' = 'datagen',
     'fields.word.length' = '1'
);

-- 流模式写入Paimon时需要设置checkpoint间隔
SET 'execution.checkpointing.interval' = '10 s';

-- 将源表流式写入Paimon中
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;

3.4 流式读取

-- 切换至流模式
SET 'execution.runtime-mode' = 'streaming';

SELECT `interval`, COUNT(*) AS interval_cnt FROM(
SELECT cnt / 10000 AS `interval` FROM word_count
) GROUP BY `interval`;

3.5 批式写入

INSERT INTO word_count_nonkey VALUES
('a', 1),
('b', 2),
('c', 3);

3.6 批式读取

-- 切换至批模式
SET 'execution.runtime-mode' = 'batch';

SELECT * FROM word_count_nonkey;

4 通过Spark 快速上手

4.1 Catalog

Hive catalog
用Spark创建HMS类型的Catalog的方式如下:

spark-sql ... \
    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
    --conf spark.sql.catalog.paimon.metastore=hive \
    --conf spark.sql.catalog.paimon.warehouse=hdfs://nn:8020/path/to/warehouse \ #此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。
    --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions

使用paimon catalog

use paimon;

4.2 创建表

create table test_table (
    k int,
    v string
) tblproperties (
    'primary-key' = 'k'
);

4.3 批式写入

INSERT INTO test_table VALUES (1, 'Hi'), (2, 'Hello');

4.4 批式查询

SELECT * FROM test_table;

5 通过Hive 快速上手

CREATE CATALOG my_hive WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    -- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
  -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
  -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
  -- 'warehouse' = 'hdfs://nn:8020/path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
);

USE CATALOG my_hive;

CREATE TABLE test_table (
    a int,
    b string
);

INSERT INTO test_table VALUES (1, 'Table'), (2, 'Store');

SELECT * FROM test_table;

Hive SQL访问已经存在于Hive metastore中的Paimon表

SHOW TABLES;

SELECT * FROM test_table;

-- tez引擎不支持写入paimon,需要切换至mr引擎
SET hive.execution.engine=mr;

INSERT INTO test_table VALUES (3, 'a');

SELECT * FROM test_table;

5.2 创建表

CREATE TABLE hive_test_table(
    a INT COMMENT 'The a field',
    b STRING COMMENT 'The b field'
)STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';

5.3 以外表的方式访问Paimon

-- 注意这里的location指向了上面Flink SQL创建的test_table所在地址
CREATE EXTERNAL TABLE external_test_table
STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
LOCATION '/path/to/table/store/warehouse/default.db/test_table';

SELECT a, b FROM external_test_table ORDER BY a;

INSERT INTO external_test_table VALUES (3, 'Paimon');

SELECT a, b FROM external_test_table ORDER BY a;