Spark Connector for ByteHouse 连接器专门用于通过 Spark 将数据加载到 ByteHouse 企业版。本文为您介绍如何使用Spark Connector连接ByteHouse,并以Spark SQL 以及 EMR 支持的 Servless Spark 两种方式为例为您详细示例如何连接ByteHouse并处理数据。
Spark Connector
spark 版本 | 驱动程序 | 发布日期 |
---|---|---|
3.5及以上版本 | 2024-09-11 |
ByteHouse JDBC
对于要使用 Spark connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。
<dependency> <groupId>com.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.4.6</version> </dependency> <dependency> <groupId>com.bytedance.bytehouse-ce</groupId> <artifactId>clickhouse-spark-runtime-3.5_2.12</artifactId> <version>0.8.1.1</version> </dependency>
然后,将以下存储库添加到 pom.xml
文件:
<repository> <id>bytedance</id> <name>ByteDance Public Repository</name> <url>https://artifact.bytedance.com/repository/releases</url> </repository>
您可以参考下面的命令,基于 Spark SQL CLI 连接到 ByteHouse。
export SPARK_LOCAL_IP=localhost export SPARK_HOME=/opt/tiger/spark export CLICKHOUSE_HOST=<your-host> export CLICKHOUSE_HTTP_PORT=8123 export CLICKHOUSE_USER=<your-username> export CLICKHOUSE_PASSWORD='<your-password>' $SPARK_HOME/bin/spark-sql \ --conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \ --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \ --conf spark.sql.catalog.clickhouse.protocol=http \ --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \ --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \ --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \ --conf spark.sql.catalog.clickhouse.database=default \ --conf spark.sql.catalog.clickhouse.shard-discovery.kind=CE_API_CLUSTERS \ --conf spark.sql.catalog.clickhouse.bytehouse-ce.api.account-id=xx \ --conf spark.sql.catalog.clickhouse.clickhouse.cluster=xx \ --conf spark.driver.bindAddress=127.0.0.1 \ --conf spark.clickhouse.write.distributed.convertLocal=true \ --conf spark.clickhouse.write.useShardsWriter=true \ --conf spark.clickhouse.write.format=json \ --conf spark.clickhouse.read.filterByPartition=false \ --jars clickhouse-jdbc-0.4.6-all.jar,clickhouse-spark-runtime-3.5_2.12-0.8.1.1.jar
其中关键连接参数为:
参数 | 配置说明 | |
---|---|---|
|
| 配置为ByteHouse的连接域名、连接用户名和密码,获取方式可参见获取集群连接信息。 |
| 配置为ByteHouse的数据库名。 | |
| 配置中实际使用的 jar 实际文件路径。 | |
其他 | 请参见下文的配置参数章节。 |
您可以参考下面的命令,基于 Servless Spark 方式连接到 ByteHouse。Servless Spark 方式适用于火山引擎 EMR(E-MapReduce)服务。
注意
通过此方式连接ByteHouse时,您需要先手动下载驱动,并上传到ByteHouse的同地域的 TOS bucket中,用于下方Connector调用。
set serverless.spark.analysis=true; set spark.hadoop.fs.tos.skip.resolve = true; set las.cross.vpc.access.enabled = true; set las.cross.vpc.vpc.id = <your-vpc-id>; -- 使用可用区B子网 set las.cross.vpc.subnet.id = <your-subnet-id>; set las.cross.vpc.security.group.id = <your-securitygroup-id>; set las.cross.vpc.accountId=<your-account-id>; set las.cluster.type = vke; set emr.serverless.spark.only.parse.enabled = true; set serverless.spark.access.key=<your-tos-ak>; set serverless.spark.secret.key=<your-tos-sk>; set spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog; set spark.sql.catalog.clickhouse.host=xx.bytehouse-ce.ivolces.com; set spark.sql.catalog.clickhouse.protocol=http; set spark.sql.catalog.clickhouse.http_port=8123; set spark.sql.catalog.clickhouse.user=<your-username>; set spark.sql.catalog.clickhouse.password=<your-password>; set spark.sql.catalog.clickhouse.database=<your-database>; set spark.sql.catalog.clickhouse.shard_discovery.kind=CE_API_CLUSTERS; set spark.sql.catalog.clickhouse.bytehouse_ce.api.account_id=<your-account-id>; set spark.sql.catalog.clickhouse.cluster=<your-cluster-name>; set spark.sql.catalog.clickhouse.option.socket_timeout=300000; set spark.sql.catalog.clickhouse.option.connect_timeout=30000; set spark.sql.catalog.clickhouse.option.custom_settings=max_execution_time=3000; set spark.clickhouse.write.format=json; set spark.clickhouse.read.filterByPartition=false; set spark.clickhouse.write.distributed.convertLocal=true; set spark.clickhouse.write.useShardsWriter=true; set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/clickhouse-spark-runtime-3.5_2.12-0.8.1.1.jar"}]; -- 从 hive 的 people_t 导入 bh 的 people_t_copy 表 use clickhouse; insert into people_t_copy select * from spark_catalog.default.people_t;
其中关键连接参数为:
参数 | 配置说明 |
---|---|
| 您可以在 ByteHouse控制台 集群管理-集群详情-基本信息 页面查询到此信息。 |
| 您可参见 获取Access Key。 |
| 公网或私网 |
| 设置 database 数据库名和集群名。 |
| 填写实际使用的驱动 jar 文件的 TOS 路径。 |
其他 | 请参见下文的配置参数章节。 |
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
spark.sql.catalog.${catalog} | ✅ | 无 | string | 需配置为固定值:xenon.clickhouse.ClickHouseCatalog |
spark.sql.catalog.${catalog}.host | ✅ | 无 | string | ByteHouse集群连接地址,格式为: |
spark.sql.catalog.${catalog}.http_port | 否 | 8123 | int | ByteHouse HTTP连接端口 |
spark.sql.catalog.${catalog}.protocol | 否 | http | string | ByteHouse HTTP连接协议 |
spark.sql.catalog.${catalog}.user | ✅ | 无 | string | ByteHouse连接的用户名 |
spark.sql.catalog.${catalog}.password | ✅ | 无 | string | ByteHouse连接的密码 |
spark.sql.catalog.${catalog}.database | 否 | default | string | ByteHouse的数据库名 |
spark.sql.catalog.${catalog}.timezone | 否 | server | string | 时区信息,可选值:
|
spark.sql.catalog.${catalog}.shard_discovery.kind | ✅ | 无 | string | 需配置为固定值:CE_API_CLUSTERS |
spark.sql.catalog.${catalog}.shard_discovery.host | 否 | 无 | string | 默认为 spark.sql.catalog.${catalog}.host |
spark.sql.catalog.${catalog}.shard_discovery.port | 否 | 80 | int | 分片发现主机端口 |
spark.sql.catalog.${catalog}.bytehouse_ce.api.account_id | ✅ | 无 | 火山引擎主账号ID | |
spark.sql.catalog.${catalog}.cluster | ✅ | 无 | string | 集群名称 |
spark.sql.catalog.${catalog}.option | ✅ | 无 | map | JDBC参数 |
spark.bytehouse.write.batchSize | 否 | 10000 | int | JDBC写入批处理大小 |
spark.clickhouse.write.repartitionByPartition | 否 | true | bool | 写入前是否通过ClickHouse分区键来重新划分数据以满足ClickHouse分布 |
spark.clickhouse.write.distributed.useClusterNodes | 否 | true | bool | 写分布式表时写入集群所有节点 |
spark.clickhouse.write.distributed.convertLocal | 否 | false | bool | 当写分布式表时,改为写本地表 |
spark.clickhouse.read.distributed.convertLocal | 否 | true | bool | 当读取分布式表时,改为读取本地表 |
spark.clickhouse.read.filterByPartition | 否 | true | bool | 读取表时,按_partition_id或分区值过滤 |
spark.clickhouse.write.localSortByPartition | 否 | true | bool | 如果为true,写入前按分区做本地排序 |
spark.clickhouse.write.localSortByKey | 否 | true | bool | 如果为true,写入前通过排序键做本地排序 |
spark.clickhouse.write.format | 否 | json | string | 当前仅支持JSON格式 |
spark.clickhouse.write.useShardsWriter | 否 | false | string | 如果为true,则在写入分片之前缓存分片中的行 |
spark.clickhouse.write.shardingStrategy | 否 | AUTO | string | 分片策略,支持HASH、AUTO |
spark.clickhouse.write.shardingExpression | 否 | 无 | string | 分片表达式,支持cityHash64(xx)、intHash64(xx) 和 sipHash64(xx) |
spark.clickhouse.read.format | 否 | json | string | 当前仅支持JSON格式 |
Spark数据类型 | ByteHouse数据类型 | 读取 | 写入 |
---|---|---|---|
Integer types | UInt8 | ✅ | ✅ |
UInt16 | ✅ | ✅ | |
UInt32 | ✅ | ✅ | |
UInt64 | ✅ | ✅ | |
Int8 | ✅ | ✅ | |
Int16 | ✅ | ✅ | |
Int32 | ✅ | ✅ | |
Int64 | ✅ | ✅ | |
Floating-point numbers | Float32 | ✅ | ✅ |
Float64 | ✅ | ✅ | |
Decimal | ✅ | ✅ | |
Boolean | bool | ✅ | ✅ |
Strings | String | ✅ | ✅ |
FixedString | ✅ | ✅ | |
Dates | Date | ✅ | ✅ |
DateTime | ✅ | ✅ | |
UUID | UUID | ✅ | ✅ |
Enum | Enum8 | ✅ | ✅ |
Enum16 | ✅ | ✅ | |
Arrays | Array(T) | ✅ | ✅ |
Maps | Map(K, V) | ✅ | ✅ |
Spark Catalog函数 | 是否支持 | Spark SQL 语法 | ByteHouse 企业版 SQL 语法 |
---|---|---|---|
listTables | ✅ |
|
|
loadTable | ✅ |
|
|
createTable | ❌ |
| |
dropTable | ✅ |
|
|
renameTable | ✅ |
|
|
listNamespaces | ✅ |
|
|
createNamespace | ✅ |
|
|
dropNamespace | ✅ |
|
|