目前火山引擎 E-MapReduce(EMR3.9.0)集群中,Paimon支持的版本为0.6-release。
Paimon 在创建 Hadoop 集群过程中作为可选组件安装,集群创建完成后确保 Paimon 组件可见并且状态是正常的。详见创建集群。 如果在集群初始化时没有安装,也可以通过添加 Paimon 组件在已有集群上添加 Paimon。详见添加服务。
引擎 | 版本 | 批读 | 批写 | 创建表 | 修改表 | 流写 | 流读 | 批式覆盖写 |
---|---|---|---|---|---|---|---|---|
Flink | 1.16.1 | ✅ | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ |
Spark | 3.5.0 | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ❌ |
Hive | 3.1.3 | ✅ | ✅ | ✅ | ❌ | ❌ | ❌ | ❌ |
Presto | 0.280 | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ❌ |
拉起Flink session集群
<FLINK_HOME>/bin/yarn-session.sh -d
使用Flink sql-client执行Flink SQL
<FLINK_HOME>/bin/sql-client.sh
在使用Paimon库表之前,需要为Paimon创建Catalog
目前Paimon支持两种类型的catalog:FileSystem或Hive。
FileSystem
这种catalog会将Paimon库表元数据存储在文件系统上,比如HDFS。
用Flink创建FileSystem类型的Catalog的方式如下:
CREATE CATALOG my_catalog WITH ( 'type' = 'paimon', 'warehouse' = 'hdfs://nn:8020/path/to/warehousee' ); USE CATALOG my_catalog;
Hive
这种catalog会将Paimon库表元数据存储在Hive metastore当中,用户将可以通过Hive来访问这些Paimon表。
说明
使用Hive metastore作为Catalog时,依赖Flink Hive bunndle,请确保Flink Hive bundle已被添加至Flink相关依赖中。
用Flink创建HMS类型的Catalog的方式如下:
CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', -- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment -- 'warehouse' = 'hdfs://nn:8020/path/to/warehouse', 此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。 ); USE CATALOG my_hive;
Primary Key表(主键表)
CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt BIGINT );
Append Only表(非主键表)
CREATE TABLE word_count_nonkey ( word STRING, cnt BIGINT );
-- 首先创建一个流式数据源表 CREATE TEMPORARY TABLE word_table ( word STRING ) WITH ( 'connector' = 'datagen', 'fields.word.length' = '1' ); -- 流模式写入Paimon时需要设置checkpoint间隔 SET 'execution.checkpointing.interval' = '10 s'; -- 将源表流式写入Paimon中 INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;
-- 切换至流模式 SET 'execution.runtime-mode' = 'streaming'; SELECT `interval`, COUNT(*) AS interval_cnt FROM( SELECT cnt / 10000 AS `interval` FROM word_count ) GROUP BY `interval`;
INSERT INTO word_count_nonkey VALUES ('a', 1), ('b', 2), ('c', 3);
-- 切换至批模式 SET 'execution.runtime-mode' = 'batch'; SELECT * FROM word_count_nonkey;
Hive catalog
用Spark创建HMS类型的Catalog的方式如下:
spark-sql ... \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.paimon.metastore=hive \ --conf spark.sql.catalog.paimon.warehouse=hdfs://nn:8020/path/to/warehouse \ #此参数指定warehouse地址,默认为hive-site中hive.metastore.warehouse.dir参数对应的值,也可自定义路径,可支持TOS路径。 --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
使用paimon catalog
use paimon;
create table test_table ( k int, v string ) tblproperties ( 'primary-key' = 'k' );
INSERT INTO test_table VALUES (1, 'Hi'), (2, 'Hello');
SELECT * FROM test_table;
CREATE CATALOG my_hive WITH ( 'type' = 'paimon', 'metastore' = 'hive', -- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment -- 'warehouse' = 'hdfs://nn:8020/path/to/table/store/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf ); USE CATALOG my_hive; CREATE TABLE test_table ( a int, b string ); INSERT INTO test_table VALUES (1, 'Table'), (2, 'Store'); SELECT * FROM test_table;
Hive SQL访问已经存在于Hive metastore中的Paimon表
SHOW TABLES; SELECT * FROM test_table; -- tez引擎不支持写入paimon,需要切换至mr引擎 SET hive.execution.engine=mr; INSERT INTO test_table VALUES (3, 'a'); SELECT * FROM test_table;
CREATE TABLE hive_test_table( a INT COMMENT 'The a field', b STRING COMMENT 'The b field' )STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';
-- 注意这里的location指向了上面Flink SQL创建的test_table所在地址 CREATE EXTERNAL TABLE external_test_table STORED BY 'org.apache.paimon.hive.PaimonStorageHandler' LOCATION '/path/to/table/store/warehouse/default.db/test_table'; SELECT a, b FROM external_test_table ORDER BY a; INSERT INTO external_test_table VALUES (3, 'Paimon'); SELECT a, b FROM external_test_table ORDER BY a;