ByteHouse 云数仓版 Spark Connector 连接器专门用于通过 Spark 将数据加载到 ByteHouse 云数仓版。
本文将介绍通过 Spark SQL ,以及 EMR 支持的 Servless Spark 两种方式连接ByteHouse并处理数据。
请按照下面的方法,在程序中配置以下驱动的依赖项。
对于要使用 Spark connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。
<dependency> <groupId>com.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.4.6</version> </dependency> <dependency> <groupId>com.bytedance.bytehouse</groupId> <artifactId>driver-java</artifactId> <version>1.1.58</version> <classifier>all</classifier> </dependency> <dependency> <groupId>com.bytedance.bytehouse</groupId> <artifactId>clickhouse-spark-runtime-3.5_2.12</artifactId> <version>0.8.0.6</version> </dependency>
然后,将以下存储库添加到 pom.xml
文件:
<repository> <id>bytedance</id> <name>ByteDance Public Repository</name> <url>https://artifact.bytedance.com/repository/releases</url> </repository>
您可以参考下面的命令,基于 Spark SQL CLI 连接到 ByteHouse。
请注意替换:
export
字段中的 CLICKHOUSE_HOST
和 CLICKHOUSE_PASSWORD
(即API key
)的值,您可以参见 获取 ByteHouse 连接信息来获取;export
字段中的数据库名CLICKHOUSE_DATABASE
和计算组名CLICKHOUSE_VW
。--jars
配置中实际使用的 jar 实际文件路径。export SPARK_LOCAL_IP=localhost export SPARK_HOME=/opt/tiger/spark export CLICKHOUSE_HOST=<your_HOST> export CLICKHOUSE_PASSWORD=<your-api-key> export CLICKHOUSE_DATABASE=<database> export CLICKHOUSE_VW=<your-virtual-warehouse> export BYTEHOUSE_VW_ID=<your-virtual-warehouse-id> $SPARK_HOME/bin/spark-sql \ --conf spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog \ --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \ --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \ --conf spark.sql.catalog.clickhouse.database=${CLICKHOUSE_DATABASE} \ --conf spark.sql.catalog.clickhouse.option.vw=${CLICKHOUSE_VW} \ --conf spark.driver.bindAddress=127.0.0.1 \ --conf spark.sql.catalog.clickhouse.option.virtual_warehouse=${BYTEHOUSE_VW_ID} \ --conf spark.sql.catalog.clickhouse.option.is_cnch=true \ --conf spark.sql.catalog.clickhouse.gateway_version=v2 \ --conf spark.bytehouse.write.batchSize=50000 \ --conf spark.bytehouse.write.format=jdbc \ --jars clickhouse-jdbc-0.4.6-all.jar,driver-java-1.1.44.jar,clickhouse-spark-runtime-3.5_2.12-0.8.0.6.jar
Servless Spark 方式适用于火山引擎 EMR(E-MapReduce)服务。
您可以参考下面的命令,基于 Servless Spark 方式连接到 ByteHouse。
请注意替换:
set
中的 vpc 网络信息。您可以在 ByteHouse控制台 租户管理-基本信息 页面的最下方查询到此信息。set
中的 AK/SK(access key、secret key)信息,您可参见 获取Access Key。set
字段中的公网或私网 HOST
地址,以及API key
,您可以参见 获取 ByteHouse 连接信息来获取;同时设置 database 数据库名和VW 计算组名。set serverless.spark.analysis=true; set spark.hadoop.fs.tos.skip.resolve = true; set las.cross.vpc.access.enabled = true; set las.cross.vpc.vpc.id = <your-vpc-id>; -- 使用可用区B子网 set las.cross.vpc.subnet.id = <your-subnet-id>; set las.cross.vpc.security.group.id = <your-securitygroup-id>; set las.cross.vpc.accountId=<your-account-id>; set las.cluster.type = vke; set emr.serverless.spark.only.parse.enabled = true; set serverless.spark.access.key=<your-tos-ak>; set serverless.spark.secret.key=<your-tos-sk>; set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog; -- cdw 内网连接地址 set spark.sql.catalog.clickhouse.host=tenant-<your-account-id>-cn-beijing.bytehouse.ivolces.com; -- 公网连接地址 -- set spark.sql.catalog.clickhouse.host=bytehouse-cn-beijing.volces.com; set spark.sql.catalog.clickhouse.protocol=http; set spark.sql.catalog.clickhouse.password=<your-api-key>; set spark.sql.catalog.clickhouse.database=<database>; set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>; -- gateway v2 set spark.sql.catalog.clickhouse.option.virtual_warehouse=${your-virtual-warehouse-id}; set spark.sql.catalog.clickhouse.gateway_version=v2; set spark.sql.catalog.clickhouse.option.is_cnch=true; set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/cdw/driver-1.1.49-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.6.jar"}]; use clickhouse; -- 查询 hive -- select * from spark_catalog.hive_db.hive_table; -- select * from people_t_copy; insert into people_t_copy select * from people_t;
参数 | 是否必选 | 默认值 | 数据类型 | 描述 | 起始版本 |
---|---|---|---|---|---|
spark.sql.catalog.${catalog}.host | ✅ | string | 例如 bytehouse-cn-beijing.volces.com, | ||
spark.sql.catalog.${catalog}.http_port | 19000 | int | |||
spark.sql.catalog.${catalog}.protocol | http | string | |||
spark.sql.catalog.${catalog}.user | bytehouse | string | |||
spark.sql.catalog.${catalog}.password | ✅ | string | 参阅 获取 API Token。 | ||
spark.sql.catalog.${catalog}.database | default | string | |||
spark.sql.catalog.${catalog}.gateway_version | v1 | string | v1或v2 | 0.8.0.3 | |
spark.sql.catalog.${catalog}.timezone | server | string | server, client, UTC+3, Asia/Shanghai, etc. | ||
spark.sql.catalog.${catalog}.dedup_key_mode | string | throw, append, replace, ignore | 0.8.0.3 | ||
spark.sql.catalog.${catalog}.option | ✅ | map | 其中 vw 参数是必选项,配置如下: 如果是 v2网关,需要增加下面的配置项 | ||
spark.bytehouse.write.batchSize | 10000 | int | |||
spark.bytehouse.write.maxRetry | 3 | int | 如果写入失败时,最大重试次数 | 0.8.0.3 | |
spark.bytehouse.write.retryInterval | 10s | time | 重试时间间隔 | 0.8.0.3 |
场景 | 是否支持 | Spark SQL 语法 | ByteHouse 云数仓版 SQL 语法 |
---|---|---|---|
listTables | ✅ | show tables | SHOW TABLES FROM |
loadTable | ✅ | show create table xx | SHOW TABLES FROM |
createTable | ❌ | CREATE TABLE [IF NOT EXISTS] [tableIdentifier] [UUID uuid] | |
dropTable | ✅ | drop table | DROP TABLE |
renameTable | ✅ | rename table xx to xxx | RENAME TABLE |
listNamespaces | ✅ | show databases | SHOW DATABASES |
createNamespace | ✅ | create database xx | CREATE DATABASE |
dropNamespace | ✅ | drop database xx | DROP DATABASE |
数据类型 | 读取 | 写入 | |
---|---|---|---|
Integer types | UInt8 | ✅ | ✅ |
UInt16 | ✅ | ✅ | |
UInt32 | ✅ | ✅ | |
UInt64 | ✅ | ✅ | |
Int8 | ✅ | ✅ | |
Int16 | ✅ | ✅ | |
Int32 | ✅ | ✅ | |
Int64 | ✅ | ✅ | |
Floating-point numbers | Float32 | ✅ | ✅ |
Float64 | ✅ | ✅ | |
Decimal | ✅ | ✅ | |
Boolean | bool | ✅ | ✅ |
Strings | String | ✅ | ✅ |
FixedString | ✅ | ✅ | |
Dates | Date | ✅ | ✅ |
DateTime | ✅ | ✅ | |
UUID | UUID | ✅ | ✅ |
Enum | Enum8 | ✅ | ✅ |
Enum16 | ✅ | ✅ | |
Arrays | Array(T) | ✅ | ✅ |
Maps | Map(K, V) | ✅ | ✅ |