Flink 官方提供的 SQL 客户端可以支持编写 SQL、调试和提交 Flink 任务到 Flink 集群上的功能,具体使用操作,可参考Flink官方文档。
本文将额外介绍几种火山引擎 E-MapReduce(EMR)Flink 的使用场景。
在火山 EMR Flink 下,我们可以通 SQL 客户端将 Flink SQL 任务提交到 standlone 集群或者 Yarn 集群。
Standlone 场景下需要先启动一个 Standlone 的集群,可在FLINK_HOME
目录(默认为/usr/lib/emr/current/flink
)下运行以下命令创建:
./bin/start-cluster.sh
Standlone 集群启动成功后,可以执行以下命令启动 SQL 客户端命令行界面:
./bin/sql-client.sh embedded
如果想停止 Standlone 集群,可执行以下命令停止:
./bin/stop-cluster.sh
Yarn 集群场景下支持多种 Flink 任务提交模式,包括 Yarn-Session 模式,Per-Job Cluster 模式,Application 模式。
Flink SQL Client 暂不支持 Application模式
Session 模式下,需要先执行以下命令启动 Yarn Session:
./bin/yarn-session.sh -d
Yarn Session 启动成功后,会创建一个/tmp/.yarn-properties-root
文件,记录最近一次提交到 Yarn 的 Application ID,执行以下命令启动 SQL 客户端命令行界面,后续指定的 Flink SQL 会提交到之前启动的 Yarn Session Application。
./bin/sql-client.sh embedded -s yarn-session
可以执行以下命令停止当前启动的 Yarn Session
cat /tmp/.yarn-properties-root | grep applicationID | cut -d'=' -f 2 | xargs -I {} yarn application -kill {}
Per-Job Cluster 模式无需提前启动集群,可以在启动 SQL 客户端命令行界面,设置execution.target
,后续提交的每一个 Flink SQL 任务将会作为独立的任务提交到 Yarn。
说明
yarn-per-job 模式已经在 Flink 1.16 被标记为 deprecated 状态。
./bin/sql-client.sh embedded Flink SQL> set execution.target=yarn-per-job; [INFO] Session property has been set.
也可以通过在flink-conf.yaml
文件预定义配置改参数
# flink-conf.yaml execution.target: yarn-per-job
火山 EMR Flink 支持多种方式对 TOS 对象存储进行读写操作,比如基于Hive Connector,Hudi Connector等。
以下以 yarn-session 模式为例,显示如何集成 Hive Connector。
./bin/yarn-session.sh -d ./bin/sql-client.sh embedded -s yarn-session -j connectors/flink-sql-connector-hive-3.1.2_2.12-1.16.1.jar
Flink SQL> CREATE CATALOG hive WITH ( > 'type' = 'hive', > 'hive-conf-dir' = '/etc/emr/hive/conf' > ); [INFO] Execute statement succeed. Flink SQL> use catalog hive; [INFO] Execute statement succeed. Flink SQL> create database demo_db; [INFO] Execute statement succeed. Flink SQL> show databases; +---------------+ | database name | +---------------+ | default | | demo_db | +---------------+ 3 rows in set
可以通过其它引擎创建 Hive 表,比如 Spark、Hive 等,也可以在 Flink SQL 客户端切换到 Hive Dialect 模式。
# 启动Spark SQL命令行交互界面 spark-sql spark-sql> CREATE TABLE demo_tbl1 ( > uuid STRING, > name STRING, > age INT, > ts TIMESTAMP > ) > PARTITIONED BY (`partition` STRING); Time taken: 0.652 seconds spark-sql> desc formatted demo_tbl1; uuid string name string age int ts timestamp partition string # Partition Information # col_name data_type comment partition string # Detailed Table Information Database demo_db Table demo_tbl1 Created Time Fri Nov 25 15:04:43 CST 2022 Last Access UNKNOWN Created By Spark 3.2.1 Type MANAGED Provider hive Comment Table Properties [bucketing_version=2, sink.partition-commit.policy.kind=metastore, transient_lastDdlTime=1669359883] Location tos://xxxxx-v2/hms-warehouse/demo_db.db/demo_tbl1 Serde Library org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat org.apache.hadoop.mapred.TextInputFormat OutputFormat org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat Storage Properties [serialization.format=1] Partition Provider Catalog Time taken: 0.085 seconds, Fetched 25 row(s)
从 Flink 1.16 开始,如果要使用 Hive Dialect 模式,需要对 Flink 系统部分 Jar 依赖进行调整,详情参考 官方文档,可执行以下命令进行依赖 Jar 包准备。
# 进入FLINK_HOME mv lib/flink-table-planner-loader-1.16.1.jar opt/ cp opt/flink-table-planner_2.12-1.16.1.jar lib/ cp connectors/flink-sql-connector-hive-3.1.2_2.12-1.16.1.jar lib/
依赖 Jar 包调整之后,需重启 yarn-session 以及 sql-client,建表内容如下:
./bin/sql-client.sh embedded -s yarn-session Flink SQL> set table.sql-dialect=hive; [INFO] Session property has been set. Flink SQL> CREATE CATALOG hive WITH ( > 'type' = 'hive', > 'hive-conf-dir' = '/etc/emr/hive/conf' > ); [INFO] Execute statement succeed. Flink SQL> use catalog hive; [INFO] Execute statement succeed. CREATE TABLE `hive`.`demo_db`.`demo_tbl2` ( uuid STRING, name STRING, age INT, ts TIMESTAMP ) PARTITIONED BY (`partition` STRING) LOCATION 'tos://{bucket_name}/hms-warehouse/demo_db.db/demo_tbl2' TBLPROPERTIES ( 'sink.partition-commit.policy.kind'='metastore' );
建议非 hive dialect 场景,不要将 connector 的 Jar 包依赖放入 lib 目录下。
Flink SQL> SET 'execution.runtime-mode' = 'batch'; # 建议配置flink-conf.yaml中 [INFO] Session property has been set. Flink SQL> INSERT INTO `hive`.`demo_db`.`demo_tbl1` VALUES > ('id21','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), > ('id22','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), > ('id23','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), > ('id24','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), > ('id25','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), > ('id26','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), > ('id27','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), > ('id28','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); [INFO] Submitting SQL update statement to the cluster... Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; # 建议配置在flink-conf.yaml中 [INFO] Session property has been set. Flink SQL> select * from `hive`.`demo_db`.`demo_tbl1`; +------+---------+-----+-------------------------------+-----------+ | uuid | name | age | ts | partition | +------+---------+-----+-------------------------------+-----------+ | id27 | Bob | 44 | 1970-01-01 00:00:07.000000000 | par4 | | id26 | Emma | 20 | 1970-01-01 00:00:06.000000000 | par3 | | id22 | Stephen | 33 | 1970-01-01 00:00:02.000000000 | par1 | | id25 | Sophia | 18 | 1970-01-01 00:00:05.000000000 | par3 | | id21 | Danny | 23 | 1970-01-01 00:00:01.000000000 | par1 | | id28 | Han | 56 | 1970-01-01 00:00:08.000000000 | par4 | | id23 | Julian | 53 | 1970-01-01 00:00:03.000000000 | par2 | | id24 | Fabian | 31 | 1970-01-01 00:00:04.000000000 | par2 | +------+---------+-----+-------------------------------+-----------+ 8 rows in set
对分区表进行流式写入时,需要设置分区提交策略,通知下游某个分区已经写完毕可以被读取了。非分区表可以不设置,亦可以在建表时设置到表的 properties 中。
# 切换到Streaming模式 Flink SQL> SET 'execution.runtime-mode' = 'streaming'; [INFO] Session property has been set. # 对分区表,修改表Properties,或者在建表时设置该属性 Flink SQL> ALTER TABLE `hive`.`demo_db`.`demo_tbl1` SET ('sink.partition-commit.policy.kind'='metastore'); [INFO] Execute statement succeed. Flink SQL> INSERT INTO `hive`.`demo_db`.`demo_tbl1` VALUES > ('id11','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), > ('id12','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), > ('id13','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), > ('id14','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), > ('id15','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), > ('id16','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), > ('id17','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), > ('id18','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); [INFO] Submitting SQL update statement to the cluster... Flink SQL> select * from `hive`.`demo_db`.`demo_tbl1`; +----+--------------------------------+--------------------------------+-------------+-------------------------------+--------------------------------+ | op | uuid | name | age | ts | partition | +----+--------------------------------+--------------------------------+-------------+-------------------------------+--------------------------------+ | +I | id14 | Fabian | 31 | 1970-01-01 00:00:04.000000000 | par2 | | +I | id28 | Han | 56 | 1970-01-01 00:00:08.000000000 | par4 | | +I | id26 | Emma | 20 | 1970-01-01 00:00:06.000000000 | par3 | | +I | id13 | Julian | 53 | 1970-01-01 00:00:03.000000000 | par2 | | +I | id12 | Stephen | 33 | 1970-01-01 00:00:02.000000000 | par1 | | +I | id15 | Sophia | 18 | 1970-01-01 00:00:05.000000000 | par3 | | +I | id18 | Han | 56 | 1970-01-01 00:00:08.000000000 | par4 | | +I | id11 | Danny | 23 | 1970-01-01 00:00:01.000000000 | par1 | | +I | id27 | Bob | 44 | 1970-01-01 00:00:07.000000000 | par4 | | +I | id22 | Stephen | 33 | 1970-01-01 00:00:02.000000000 | par1 | | +I | id17 | Bob | 44 | 1970-01-01 00:00:07.000000000 | par4 | | +I | id25 | Sophia | 18 | 1970-01-01 00:00:05.000000000 | par3 | | +I | id24 | Fabian | 31 | 1970-01-01 00:00:04.000000000 | par2 | | +I | id16 | Emma | 20 | 1970-01-01 00:00:06.000000000 | par3 | | +I | id23 | Julian | 53 | 1970-01-01 00:00:03.000000000 | par2 | | +I | id21 | Danny | 23 | 1970-01-01 00:00:01.000000000 | par1 | +----+--------------------------------+--------------------------------+-------------+-------------------------------+--------------------------------+ Received a total of 16 rows
Flink 直接集成 Hudi 进行读写 TOS 操作,并将元数据同步到 HMS 中,供其它引擎查询。下面以 Yarn per-job 为例,演示相关集成操作。
Flink 集成 Hudi 需要引入 hudi-flink-bundle 包,目前在 EMR 集群启用 Hudi 的场景下,默认已经提供 hudi-flink-bundle 包。
./bin/sql-client.sh embedded -j connectors/flink-sql-connector-hive-3.1.2_2.12-1.16.1.jar
以创建 COW 表为例,并持久化 Flink 创建的 Hudi 表在 HMS 中。Flink 集成 Hudi 详细细节可参考Hudi官方文档,Flink 集成 Hudi 开启 Hive Sync 详细细节可参考 Hudi官方文档。
CREATE CATALOG hive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/etc/emr/hive/conf' ); CREATE TABLE `hive`.`demo_db`.`flink_hudi_cow_tbl_hms_sync_demo_original` ( uuid VARCHAR(20), name VARCHAR(10), age INT, ts TIMESTAMP(3), `partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ( 'connector' = 'hudi', 'path' = 'tos://{bucket_name}/hms-warehouse/demo_db.db/flink_hudi_cow_tbl_hms_sync_demo', 'table.type' = 'COPY_ON_WRITE', 'hive_sync.enable' = 'true', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'thrift://{ip}:{port}', 'hive_sync.table'='flink_hudi_cow_tbl_hms_sync_demo', 'hive_sync.db'='demo_db' );
向 COW 表中写入数据,数据写入成功后,会将元数据同步到hive_sync.table
指定的 table 中,COW 场景下只会创建一个目标表,MOR 场景下除了目标表以外,还会额外创建RO
和RT
表。
INSERT INTO `hive`.`demo_db`.`flink_hudi_cow_tbl_hms_sync_demo_original` VALUES ('id41','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'), ('id42','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'), ('id43','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'), ('id44','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'), ('id45','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'), ('id46','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'), ('id47','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'), ('id48','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4'); Flink SQL> show tables; +-------------------------------------------+ | table name | +-------------------------------------------+ | flink_hudi_cow_tbl_hms_sync_demo | | flink_hudi_cow_tbl_hms_sync_demo_original | +-------------------------------------------+ Flink SQL> select * from `hive`.`demo_db`.`flink_hudi_cow_tbl_hms_sync_demo_original`; +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | op | uuid | name | age | ts | partition | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+ | +I | id38 | Han | 56 | 1970-01-01 00:00:08.000 | par4 | | +I | id37 | Bob | 44 | 1970-01-01 00:00:07.000 | par4 | | +I | id31 | Danny | 23 | 1970-01-01 00:00:01.000 | par1 | | +I | id32 | Stephen | 33 | 1970-01-01 00:00:02.000 | par1 | | +I | id35 | Sophia | 18 | 1970-01-01 00:00:05.000 | par3 | | +I | id36 | Emma | 20 | 1970-01-01 00:00:06.000 | par3 | | +I | id33 | Julian | 53 | 1970-01-01 00:00:03.000 | par2 | | +I | id34 | Fabian | 31 | 1970-01-01 00:00:04.000 | par2 | +----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+
spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' spark-sql> use demo_db; spark-sql> show tables; flink_hudi_cow_tbl_hms_sync_demo flink_hudi_cow_tbl_hms_sync_demo_original Time taken: 0.072 seconds, Fetched 2 row(s) spark-sql> select * from flink_hudi_cow_tbl_hms_sync_demo; 20221125181029843 20221125181029843_2_0 id31 par1 1bde4613-4260-4349-bf4a-23a4a5d35c77_2-4-0_20221126092842836.parquet id31 Danny 231970-01-01 08:00:01 par1 20221125181029843 20221125181029843_2_1 id32 par1 1bde4613-4260-4349-bf4a-23a4a5d35c77_2-4-0_20221126092842836.parquet id32 Stephen 331970-01-01 08:00:02 par1 20221126092842836 20221126092842836_2_2 id42 par1 1bde4613-4260-4349-bf4a-23a4a5d35c77_2-4-0_20221126092842836.parquet id42 Stephen 331970-01-01 08:00:02 par1 20221126092842836 20221126092842836_2_3 id41 par1 1bde4613-4260-4349-bf4a-23a4a5d35c77_2-4-0_20221126092842836.parquet id41 Danny 231970-01-01 08:00:01 par1 20221125181029843 20221125181029843_1_0 id33 par2 4bd429d0-6af4-47c5-829f-887568c015f7_1-4-0_20221126092842836.parquet id33 Julian 531970-01-01 08:00:03 par2 20221125181029843 20221125181029843_1_1 id34 par2 4bd429d0-6af4-47c5-829f-887568c015f7_1-4-0_20221126092842836.parquet id34 Fabian 311970-01-01 08:00:04 par2 20221126092842836 20221126092842836_1_2 id44 par2 4bd429d0-6af4-47c5-829f-887568c015f7_1-4-0_20221126092842836.parquet id44 Fabian 311970-01-01 08:00:04 par2 20221126092842836 20221126092842836_1_3 id43 par2 4bd429d0-6af4-47c5-829f-887568c015f7_1-4-0_20221126092842836.parquet id43 Julian 531970-01-01 08:00:03 par2 20221125181029843 20221125181029843_0_0 id35 par3 8f4f152c-898c-44f1-aeea-4d8e158a24ff_0-4-0_20221126092842836.parquet id35 Sophia 181970-01-01 08:00:05 par3 20221125181029843 20221125181029843_0_1 id36 par3 8f4f152c-898c-44f1-aeea-4d8e158a24ff_0-4-0_20221126092842836.parquet id36 Emma 201970-01-01 08:00:06 par3 20221126092842836 20221126092842836_0_2 id46 par3 8f4f152c-898c-44f1-aeea-4d8e158a24ff_0-4-0_20221126092842836.parquet id46 Emma 201970-01-01 08:00:06 par3 20221126092842836 20221126092842836_0_3 id45 par3 8f4f152c-898c-44f1-aeea-4d8e158a24ff_0-4-0_20221126092842836.parquet id45 Sophia 181970-01-01 08:00:05 par3 20221125181029843 20221125181029843_3_0 id38 par4 24b3d004-5111-4ecd-8bb3-aa8ee80d5f7a_3-4-0_20221126092842836.parquet id38 Han 561970-01-01 08:00:08 par4 20221125181029843 20221125181029843_3_1 id37 par4 24b3d004-5111-4ecd-8bb3-aa8ee80d5f7a_3-4-0_20221126092842836.parquet id37 Bob 441970-01-01 08:00:07 par4 20221126092842836 20221126092842836_3_2 id48 par4 24b3d004-5111-4ecd-8bb3-aa8ee80d5f7a_3-4-0_20221126092842836.parquet id48 Han 561970-01-01 08:00:08 par4 20221126092842836 20221126092842836_3_3 id47 par4 24b3d004-5111-4ecd-8bb3-aa8ee80d5f7a_3-4-0_20221126092842836.parquet id47 Bob 441970-01-01 08:00:07 par4 Time taken: 7.876 seconds, Fetched 16 row(s)
火上EMR Flink支持通过Flink Doris Connector将数据写入Doris中,并在connector目录下内置了Doris Connector。下面以Yarn per-job为例,演示通过Flink读写Doris。
# 注意不同EMR版本,connector包的版本有差异,可根据实际connector的版本进行调整 /usr/lib/emr/current/flink/bin/sql-client.sh embedded \ -j connectors/flink-doris-connector-1.16-1.3.0-ve-1.jar
可参考火山EMR-Doris-基础使用,创建对应用户账号。
-- mysql -hxxx.xxx.xx -P9030 -uxxxx -p CREATE DATABASE demo_db; USE demo_db; CREATE TABLE all_employees_info ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`, `birth_date`) DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );
-- 创建Doris Sink表 CREATE TABLE doris_sink ( emp_no int , birth_date STRING, first_name STRING, last_name STRING, gender STRING, hire_date STRING ) WITH ( 'connector' = 'doris', 'fenodes' = '192.168.1.41:8030', 'table.identifier' = 'demo_db.all_employees_info', 'username' = 'test_user', 'password' = 'test_passwd', 'sink.properties.two_phase_commit'='true', 'sink.label-prefix'='doris_demo_emp_001' ); Flink SQL> INSERT INTO `doris_sink` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'), (10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'), (10003,'1959-12-03','Parto','Bamford','M','1986-08-28'), (10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'), (10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'), (10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'), (10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'), (10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'), (10009,'1952-04-19','Sumant','Peac','F','1985-02-18'), (10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24');
-- 创建Doris Source表 Flink SQL> CREATE TABLE doris_source ( emp_no int , birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE ) WITH ( 'connector' = 'doris', 'fenodes' = '192.168.1.41:8030', 'table.identifier' = 'demo_db.all_employees_info', 'username' = 'test_user', 'password' = 'test_passwd', 'doris.read.field' = 'emp_no,birth_date,first_name,last_name,gender,hire_date' ); -- 读取数据 Flink SQL> select * from doris_source; +----+-------------+------------+--------------------------------+--------------------------------+--------------------------------+------------+ | op | emp_no | birth_date | first_name | last_name | gender | hire_date | +----+-------------+------------+--------------------------------+--------------------------------+--------------------------------+------------+ | +I | 10001 | 1953-09-02 | Georgi | Facello | M | 1986-06-26 | | +I | 10002 | 1964-06-02 | Bezalel | Simmel | F | 1985-11-21 | | +I | 10003 | 1959-12-03 | Parto | Bamford | M | 1986-08-28 | | +I | 10004 | 1954-05-01 | Chirstian | Koblick | M | 1986-12-01 | | +I | 10005 | 1955-01-21 | Kyoichi | Maliniak | M | 1989-09-12 | | +I | 10006 | 1953-04-20 | Anneke | Preusig | F | 1989-06-02 | | +I | 10007 | 1957-05-23 | Tzvetan | Zielinski | F | 1989-02-10 | | +I | 10008 | 1958-02-19 | Saniya | Kalloufi | M | 1994-09-15 | | +I | 10009 | 1952-04-19 | Sumant | Peac | F | 1985-02-18 | | +I | 10010 | 1963-06-01 | Duangkaew | Piveteau | F | 1989-08-24 | +----+-------------+------------+--------------------------------+--------------------------------+--------------------------------+------------+ Received a total of 10 rows
与Doris Connector类似,火上EMR Flink支持通过Flink StarRocks Connector将数据写入StarRocks中,并在connector目录下内置了StarRocks Connector。下面以Yarn per-job为例,演示通过Flink读写StarRocks。
# 注意不同EMR版本,connector包的版本有差异,可根据实际connector的版本进行调整 /usr/lib/emr/current/flink/bin/sql-client.sh embedded \ -j connectors/flink-starrocks-connector-1.16-1.2.5-ve-1.jar set execution.target=yarn-per-job;
可参考火山EMR-StarRocks-基础使用,创建对应用户账号。
-- mysql -hxxx.xxx.xx -P9030 -uxxxx -p mysql> CREATE DATABASE demo_db; mysql> USE demo_db; mysql> CREATE TABLE all_employees_info ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`, `birth_date`) DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );
-- 创建Sink表 Flink SQL> CREATE TABLE sr_sink ( emp_no int , birth_date STRING, first_name STRING, last_name STRING, gender STRING, hire_date STRING ) WITH ( 'connector' = 'starrocks', 'load-url' = '192.168.1.26:8030', 'jdbc-url' = 'jdbc:mysql://192.168.1.26:9030', 'database-name' = 'demo_db', 'table-name' = 'all_employees_info', 'username' = 'test_user', 'password' = 'test_passwd' ); Flink SQL> INSERT INTO `sr_sink` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'), (10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'), (10003,'1959-12-03','Parto','Bamford','M','1986-08-28'), (10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'), (10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'), (10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'), (10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'), (10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'), (10009,'1952-04-19','Sumant','Peac','F','1985-02-18'), (10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24');
-- 创建Source表 Flink SQL> CREATE TABLE sr_source ( emp_no int , birth_date STRING, first_name STRING, last_name STRING, gender STRING, hire_date STRING ) WITH ( 'connector' = 'starrocks', 'scan-url' = '192.168.1.26:8030', 'jdbc-url' = 'jdbc:mysql://192.168.1.26:9030', 'database-name' = 'demo_db', 'table-name' = 'all_employees_info', 'username' = 'test_user', 'password' = 'test_passwd' ); Flink SQL> SELECT * FROM sr_source; +----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+ | op | emp_no | birth_date | first_name | last_name | gender | hire_date | +----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+ | +I | 10001 | 1953-09-02 | Georgi | Facello | M | 1986-06-26 | | +I | 10002 | 1964-06-02 | Bezalel | Simmel | F | 1985-11-21 | | +I | 10003 | 1959-12-03 | Parto | Bamford | M | 1986-08-28 | | +I | 10004 | 1954-05-01 | Chirstian | Koblick | M | 1986-12-01 | | +I | 10005 | 1955-01-21 | Kyoichi | Maliniak | M | 1989-09-12 | | +I | 10006 | 1953-04-20 | Anneke | Preusig | F | 1989-06-02 | | +I | 10007 | 1957-05-23 | Tzvetan | Zielinski | F | 1989-02-10 | | +I | 10008 | 1958-02-19 | Saniya | Kalloufi | M | 1994-09-15 | | +I | 10009 | 1952-04-19 | Sumant | Peac | F | 1985-02-18 | | +I | 10010 | 1963-06-01 | Duangkaew | Piveteau | F | 1989-08-24 | +----+-------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+ Received a total of 10 rows
火上 Bytehouse(云数仓版)支持通过 Flink ByteHouse-Cdw Connector 将数据写入 ByteHouse(云数仓版)中,并在 connectors 目录下内置了 Bytehouse-cdw Connector。下面以 Yarn per-job 为例,演示通过 datagen 写入ByteHouse(云数仓版本)。
参考 ByteHouse云数仓版 / 快速入门 创建bytehouse集群和建表, 其中建库和建表语句如下
create databese flink_bytehouse_test;
CREATE TABLE `flink_bytehouse_test`.`cnch_table_test` ( test_key STRING, test_value UInt64, ts UInt64 ) ENGINE = CnchMergeTree ORDER BY (test_key);
启动 SQL 客户端命令行页面
# 注意不同EMR版本,connector包的版本有差异,可根据实际connector的版本进行调整 /usr/lib/emr/current/flink/bin/sql-client.sh embedded \ -j connectors/flink-sql-connector-bytehouse-cdw_2.12-1.25.4-1.16.jar set execution.target=yarn-per-job;
Flink创建ByteHouse(云数仓版本)Sink表,通过datagen 插入数据到ByteHouse(云数仓表)
CREATE TABLE random_source ( test_key STRING, test_value BIGINT, ts BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE cnch_table (test_key STRING, test_value BIGINT, ts BIGINT) WITH ( 'connector' = 'bytehouse-cdw', 'database' = 'flink_bytehouse_test', 'table-name' = 'cnch_table_test', 'bytehouse.gateway.region' = 'VOLCANO', 'bytehouse.gateway.access-key-id' = '<此处填写用户实际的 AK>', 'bytehouse.gateway.secret-key' = '<此处填写用户实际的 SK>' ); INSERT INTO cnch_table SELECT * FROM random_source;
select count(*) from `flink_bytehouse_test`.`cnch_table_test`;
类似ByteHouse(云数仓版), 火上ByteHouse(企业版)支持通过Flink ByteHouse-CE connector将数据写入到ByteHouse(企业版)中。并在 connectors 目录下内置了 Bytehouse-CE Connector(EMR 1.8.0 开始内置)。下面以 Yarn per-job 为例,演示通过 datagen 写入ByteHouse(企业版)。
/usr/lib/emr/current/flink/bin/sql-client.sh embedded \ -j connectors/flink-sql-connector-bytehouse-ce_2.12-1.25.4-1.16.jar set execution.target=yarn-per-job;
CREATE TABLE random_source ( test_key STRING, test_value BIGINT, ts BIGINT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE cnch_table (test_key STRING, test_value BIGINT, ts BIGINT) WITH ( 'connector' = 'bytehouse-ce', 'clickhouse.shard-discovery.kind' = 'CE_GATEWAY', 'bytehouse.ce.gateway.host' = '7322262036938936630.bytehouse-ce.ivolces.com', 'bytehouse.ce.gateway.port' = '8123', 'sink.enable-upsert' = 'false', 'clickhouse.cluster' = 'bytehouse_cluster_enct', -- bytehouse集群名称 'database' = 'default', -- 目标数据库 'table-name' = 'cnch_table_test', -- 目标表 注意是local表:{table_name}_local 'username' = 'xxxx@bytedance.com', -- bytehouse 用户 'password' = 'xxx' -- bytehouse 密码 ); INSERT INTO cnch_table SELECT * FROM random_source;
select count(*) FROM `default`.`cnch_table_test`;