You need to enable JavaScript to run this app.
导航
Spark Connector Driver
最近更新时间:2025.03.17 17:45:57首次发布时间:2024.08.23 11:10:49
我的收藏
有用
有用
无用
无用

ByteHouse 云数仓版 Spark Connector 连接器专门用于通过 Spark 将数据加载到 ByteHouse 云数仓版。本文将介绍通过 Spark SQL,以及 EMR 支持的 Servless Spark 两种方式连接 ByteHouse 并处理数据。

限制条件
  • Java 8
  • Scala 2.12 及以上版本
  • Spark 3.3 或 3.5

驱动安装

请按照下面的方法,在程序中配置以下驱动的依赖项。

  1. ByteHouse Spark Connector。
  2. ClickHouse JDBC 驱动程序。
  3. ByteHouse JDBC 驱动程序。

Maven 依赖

对于要使用 Spark connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。

  1. 添加如下依赖。

    Spark 版本

    依赖

    Spark 3.3

    <dependency>
        <groupId>com.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.4.6</version>
    </dependency>
    <dependency>
        <groupId>com.bytedance.bytehouse</groupId>
        <artifactId>driver-java</artifactId>
        <version>1.1.58</version>
        <classifier>all</classifier>
    </dependency>
    <dependency>
        <groupId>com.bytedance.bytehouse</groupId>
        <artifactId>clickhouse-spark-runtime-3.3_2.12</artifactId>
        <version>0.8.0.6</version>
    </dependency>
    

    Spark 3.5

    <dependency>
        <groupId>com.clickhouse</groupId>
        <artifactId>clickhouse-jdbc</artifactId>
        <version>0.4.6</version>
    </dependency>
    <dependency>
        <groupId>com.bytedance.bytehouse</groupId>
        <artifactId>driver-java</artifactId>
        <version>1.1.58</version>
        <classifier>all</classifier>
    </dependency>
    <dependency>
        <groupId>com.bytedance.bytehouse</groupId>
        <artifactId>clickhouse-spark-runtime-3.5_2.12</artifactId>
        <version>0.8.0.10</version>
    </dependency>
    
  2. 将以下存储库添加到 pom.xml 文件:

    <repository>
        <id>bytedance</id>
        <name>ByteDance Public Repository</name>
        <url>https://artifact.bytedance.com/repository/releases</url>
    </repository>
    

使用说明

Spark SQL

您可以参考下面的命令,基于 Spark SQL CLI 连接到 ByteHouse。

export SPARK_LOCAL_IP=localhost
export SPARK_HOME=/opt/tiger/spark
export CLICKHOUSE_HOST=bytehouse-cn-beijing.volces.com
export CLICKHOUSE_PASSWORD=<your-api-key>
export CLICKHOUSE_DATABASE=<database>
export CLICKHOUSE_VW=<your-virtual-warehouse>

$SPARK_HOME/bin/spark-sql \
  --conf spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog \
  --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
  --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
  --conf spark.sql.catalog.clickhouse.database=${CLICKHOUSE_DATABASE} \
  --conf spark.sql.catalog.clickhouse.option.vw=${CLICKHOUSE_VW} \
  --conf spark.driver.bindAddress=127.0.0.1 \
  --conf spark.bytehouse.write.batchSize=50000 \
  --conf spark.bytehouse.write.format=jdbc \
  --jars clickhouse-jdbc-0.4.6-all.jar,driver-java-1.1.58.jar,clickhouse-spark-runtime-3.5_2.12-0.8.0.10.jar

参数

配置说明

--conf

  • CLICKHOUSE_HOSTCLICKHOUSE_PASSWORD(即 API key)的值,您可以参见 获取 ByteHouse 连接信息来获取。
  • CLICKHOUSE_DATABASECLICKHOUSE_VW 分别配置为 ByteHouse 的数据库名和计算组名。

--jars

配置为实际使用的 jar 文件路径。

其他

更多连接参数的配置说明请参见下文的配置参数章节。

Servless Spark

Servless Spark 方式适用于火山引擎 EMR(E-MapReduce)服务。您可以参考下面的命令,基于 Servless Spark 方式连接到 ByteHouse。

set serverless.spark.analysis=true;
set spark.hadoop.fs.tos.skip.resolve = true;
set las.cross.vpc.access.enabled = true;
set las.cross.vpc.vpc.id = <your-vpc-id>;
-- 使用可用区 B 子网
set las.cross.vpc.subnet.id = <your-subnet-id>;
set las.cross.vpc.security.group.id = <your-securitygroup-id>;
set las.cross.vpc.accountId=<your-account-id>;
set las.cluster.type = vke;
set emr.serverless.spark.only.parse.enabled  = true;
set serverless.spark.access.key=<your-tos-ak>;
set serverless.spark.secret.key=<your-tos-sk>;
set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog;
-- 私网连接地址
set spark.sql.catalog.clickhouse.host=tenant-<your-account-id>-cn-beijing.bytehouse.ivolces.com;
-- 公网连接地址
-- set spark.sql.catalog.clickhouse.host=bytehouse-cn-beijing.volces.com;
set spark.sql.catalog.clickhouse.protocol=http;
set spark.sql.catalog.clickhouse.password=<your-api-key>;
set spark.sql.catalog.clickhouse.database=<database>;
set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>;

set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/cdw/driver-1.1.58-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.10.jar"}];

use clickhouse;

-- 查询 hive
-- select * from spark_catalog.hive_db.hive_table;
-- select * from people_t_copy;
insert into people_t_copy select * from people_t; 

参数

配置说明

las.cross.vpc

配置为 VPC 网络信息,您可以在 ByteHouse 控制台 租户管理-基本信息 页面的最下方查询到此信息。

注意

配置子网时,需配置为可用区 B 的子网。

serverless.spark.access.keyserverless.spark.secret.key

配置为 AK/SK(access key、secret key),您可参见 获取 Access Key

spark.sql.catalog.clickhouse

las.spark.jar.depend.jars

配置为实际使用的 jar 的 tos 路径。

其他

更多连接参数的配置说明请参见下文的配置参数章节。

配置参数

参数

是否必选

默认值

数据类型

描述

起始版本

spark.sql.catalog.${catalog}.host

无默认值

string

例如 bytehouse-cn-beijing.volces.com,有关更多信息,请参阅 支持的地域及可用区

0.8.0.2

spark.sql.catalog.${catalog}.http_port

19000

int

保持默认值即可。

0.8.0.2

spark.sql.catalog.${catalog}.protocol

http

string

保持默认值即可。

0.8.0.2

spark.sql.catalog.${catalog}.user

bytehouse

string

保持默认值即可。

0.8.0.2

spark.sql.catalog.${catalog}.password

无默认值

string

配置为 ByteHouse 的 API Key,您可以参考获取 API Key章节获取 API Key 信息。

0.8.0.2

spark.sql.catalog.${catalog}.database

default

string

配置默认数据库名,配置后,后续使用 Spark SQL 时,查询此数据库中的表数据时可接使用表名来查询,无需使用*数据库名.表名*的格式。

0.8.0.2

spark.sql.catalog.${catalog}.gateway_version

auto

string

v1, v2 或 auto。若网关为 auto,则自动选择 v1 或 v2。

0.8.0.10

spark.sql.catalog.${catalog}.timezone

server

string

使用的时区,支持配置为:server, client, UTC+3, Asia/Shanghai 等。

0.8.0.2

spark.sql.catalog.${catalog}.dedup_key_mode

无默认值

string

处理数据中的重复键值的策略,支持:

  • throw
  • append
  • replace
  • ignore

0.8.0.3

spark.sql.catalog.${catalog}.option

无默认值

map

其中 vw 参数是必选项,配置如下:
spark.sql.catalog.${catalog}.option.vw=xx

0.8.0.10

spark.bytehouse.write.batchSize

10000

int

0.8.0.2

spark.bytehouse.write.maxRetry

30

int

写入失败时,最大重试次数

0.8.0.3

spark.bytehouse.write.retryInterval

10s

time

写入失败重试时间间隔

0.8.0.3

spark.bytehouse.write.retryExponentialBackoffEnabled

true

boolean

enables exponential backoff for retry intervals during write operations

0.8.0.9

spark.bytehouse.write.sharding-strategy

无默认值

string

ClickHouse 分布式分片策略。支持值为:

  • NONE:不要分区写入。
  • HASH:shard_key 的哈希写入。

0.8.0.9

spark.bytehouse.write.sharding-key

无默认值

string

哈希分区键,可以由多个字段组成,用逗号分隔。

0.8.0.9

spark.bytehouse.write.sharding-expression

无默认值

string

哈希分区的表达式。如果设置了哈希分区的表达式,则所有涉及的字段名也必须列在 sharding-key

0.8.0.9

spark.bytehouse.read.byPartition

无默认值

boolean

如果为 true,将按分区拆分查询

0.8.0.9

spark.bytehouse.read.partition.lower

无默认值

string

按分区扫描时的下限范围(包含下限值)。

0.8.0.9

spark.bytehouse.read.partition.upper

无默认值

string

按分区扫描时的上限范围(包含上限值)。

0.8.0.9

spark.bytehouse.read.byBucket

无默认值

boolean

如果为 true,将按 bucket 拆分查询

0.8.0.9

spark.bytehouse.read.maxRetry

30

int

读取失败时最大重试次数

0.8.0.9

spark.bytehouse.read.retryInterval

10s

time

读取失败重试时间间隔

0.8.0.9

spark.bytehouse.read.retryExponentialBackoffEnabled

true

boolean

在读操作期间为重试间隔启用指数退避,即配置为 true 后,读取操作遇到错误或失败时,系统将按照指数增长的时间间隔来进行重试。

0.8.0.9

参考:支持场景

场景

是否支持

Spark SQL 语法

ByteHouse 云数仓版 SQL 语法

listTables

show tables

SHOW TABLES FROM xx

loadTable

show create table xx

SHOW TABLES FROM xx LIKE xxx

createTable

CREATE TABLE [IF NOT EXISTS] [tableIdentifier] [UUID uuid]
(
[tableColumnDfnt],
[CONSTRAINT constraint_name CHECK columnExpr,]
...
) [engineClause]

dropTable

drop table xx.xxx

DROP TABLE xx.xxx

renameTable

rename table xx to xxx

RENAME TABLE xx.xxx to xx.xxx

listNamespaces

show databases

SHOW DATABASES

createNamespace

create database xx

CREATE DATABASE xx [with properties ('engine'='cnch')]

dropNamespace

drop database xx

DROP DATABASE xx

参考:支持的数据类型

数据类型

读取

写入

Integer types

UInt8

UInt16

UInt32

UInt64

Int8

Int16

Int32

Int64

Floating-point numbers

Float32

Float64

Decimal

Decimal32

Decimal64

Boolean

bool

Strings

String

FixedString

Dates

Date

DateTime

DateTime64

UUID

UUID

Enum

Enum8

Enum16

Arrays

Array(T)

Maps

Map(K, V)