You need to enable JavaScript to run this app.
导航
Spark Connector
最近更新时间:2025.01.07 14:18:58首次发布时间:2024.09.20 18:02:26

Spark Connector for ByteHouse 连接器专门用于通过 Spark 将数据加载到 ByteHouse 企业版。本文为您介绍如何使用Spark Connector连接ByteHouse,并以Spark SQL 以及 EMR 支持的 Servless Spark 两种方式为例为您详细示例如何连接ByteHouse并处理数据。

准备工作

下载驱动

  • Spark Connector

    spark 版本

    驱动程序

    发布日期

    3.5及以上版本

    clickhouse-spark-runtime-3.5_2.12-0.8.1.1.jar
    未知大小

    2024-09-11

  • ByteHouse JDBC

    clickhouse-jdbc-0.4.6.jar
    未知大小

添加Maven依赖

对于要使用 Spark connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。

<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.4.6</version>
</dependency>
<dependency>
    <groupId>com.bytedance.bytehouse-ce</groupId>
    <artifactId>clickhouse-spark-runtime-3.5_2.12</artifactId>
    <version>0.8.1.1</version>
</dependency>

然后,将以下存储库添加到 pom.xml文件:

<repository>
    <id>bytedance</id>
    <name>ByteDance Public Repository</name>
    <url>https://artifact.bytedance.com/repository/releases</url>
</repository>

使用说明

Spark SQL

您可以参考下面的命令,基于 Spark SQL CLI 连接到 ByteHouse。

export SPARK_LOCAL_IP=localhost
export SPARK_HOME=/opt/tiger/spark
export CLICKHOUSE_HOST=<your-host>
export CLICKHOUSE_HTTP_PORT=8123
export CLICKHOUSE_USER=<your-username>
export CLICKHOUSE_PASSWORD='<your-password>'

$SPARK_HOME/bin/spark-sql \
  --conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \
  --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
  --conf spark.sql.catalog.clickhouse.protocol=http \
  --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
  --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
  --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
  --conf spark.sql.catalog.clickhouse.database=default \
  --conf spark.sql.catalog.clickhouse.shard-discovery.kind=CE_API_CLUSTERS \
  --conf spark.sql.catalog.clickhouse.bytehouse-ce.api.account-id=xx \
  --conf spark.sql.catalog.clickhouse.clickhouse.cluster=xx \
  --conf spark.driver.bindAddress=127.0.0.1 \
  --conf spark.clickhouse.write.distributed.convertLocal=true \
  --conf spark.clickhouse.write.useShardsWriter=true \
  --conf spark.clickhouse.write.format=json \
  --conf spark.clickhouse.read.filterByPartition=false \
  --jars clickhouse-jdbc-0.4.6-all.jar,clickhouse-spark-runtime-3.5_2.12-0.8.1.1.jar

其中关键连接参数为:

参数

配置说明

export

CLICKHOUSE_HOSTCLICKHOUSE_USERCLICKHOUSE_PASSWORD

配置为ByteHouse的连接域名、连接用户名和密码,获取方式可参见获取集群连接信息

CLICKHOUSE_DATABASE

配置为ByteHouse的数据库名。

--jars

配置中实际使用的 jar 实际文件路径。

其他spark相关参数

请参见下文的配置参数章节。

Servless Spark

您可以参考下面的命令,基于 Servless Spark 方式连接到 ByteHouse。Servless Spark 方式适用于火山引擎 EMR(E-MapReduce)服务。

注意

通过此方式连接ByteHouse时,您需要先手动下载驱动,并上传到ByteHouse的同地域的 TOS bucket中,用于下方Connector调用。

set serverless.spark.analysis=true;
set spark.hadoop.fs.tos.skip.resolve = true;
set las.cross.vpc.access.enabled = true;
set las.cross.vpc.vpc.id = <your-vpc-id>;
-- 使用可用区B子网
set las.cross.vpc.subnet.id = <your-subnet-id>;
set las.cross.vpc.security.group.id = <your-securitygroup-id>;
set las.cross.vpc.accountId=<your-account-id>;
set las.cluster.type = vke;
set emr.serverless.spark.only.parse.enabled  = true;
set serverless.spark.access.key=<your-tos-ak>;
set serverless.spark.secret.key=<your-tos-sk>;
set spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog;
set spark.sql.catalog.clickhouse.host=xx.bytehouse-ce.ivolces.com;
set spark.sql.catalog.clickhouse.protocol=http;
set spark.sql.catalog.clickhouse.http_port=8123;
set spark.sql.catalog.clickhouse.user=<your-username>;
set spark.sql.catalog.clickhouse.password=<your-password>;
set spark.sql.catalog.clickhouse.database=<your-database>;
set spark.sql.catalog.clickhouse.shard_discovery.kind=CE_API_CLUSTERS;
set spark.sql.catalog.clickhouse.bytehouse_ce.api.account_id=<your-account-id>;
set spark.sql.catalog.clickhouse.cluster=<your-cluster-name>;
set spark.sql.catalog.clickhouse.option.socket_timeout=300000;
set spark.sql.catalog.clickhouse.option.connect_timeout=30000;
set spark.sql.catalog.clickhouse.option.custom_settings=max_execution_time=3000;
set spark.clickhouse.write.format=json;
set spark.clickhouse.read.filterByPartition=false;
set spark.clickhouse.write.distributed.convertLocal=true;
set spark.clickhouse.write.useShardsWriter=true;

set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/clickhouse-spark-runtime-3.5_2.12-0.8.1.1.jar"}];

-- 从 hive 的 people_t 导入 bh 的 people_t_copy 表
use clickhouse;
insert into people_t_copy select * from spark_catalog.default.people_t; 

其中关键连接参数为:

参数

配置说明

las.cross.vpc.**相关参数

您可以在 ByteHouse控制台 集群管理-集群详情-基本信息 页面查询到此信息。

serverless.spark.access.keyserverless.spark.secret.key

您可参见 获取Access Key

clickhouse.host
clickhouse.user
clickhouse.password

公网或私网 HOST 地址,以及账户密码信息,您可以参见 获取集群连接信息

clickhouse.database
clickhouse.cluster

设置 database 数据库名和集群名。

las.spark.jar.depend.jars

填写实际使用的驱动 jar 文件的 TOS 路径。

其他spark相关参数

请参见下文的配置参数章节。

配置参数

参数

是否必选

默认值

数据类型

描述

spark.sql.catalog.${catalog}

string

需配置为固定值:xenon.clickhouse.ClickHouseCatalog

spark.sql.catalog.${catalog}.host

string

ByteHouse集群连接地址,格式为: xx.bytehouse-ce.ivolces.com,获取连接地址可参见 获取集群连接信息

spark.sql.catalog.${catalog}.http_port

8123

int

ByteHouse HTTP连接端口

spark.sql.catalog.${catalog}.protocol

http

string

ByteHouse HTTP连接协议

spark.sql.catalog.${catalog}.user

string

ByteHouse连接的用户名

spark.sql.catalog.${catalog}.password

string

ByteHouse连接的密码

spark.sql.catalog.${catalog}.database

default

string

ByteHouse的数据库名

spark.sql.catalog.${catalog}.timezone

server

string

时区信息,可选值:

  • server
  • client
  • UTC+3
  • Asia/Shanghai等标准时区

spark.sql.catalog.${catalog}.shard_discovery.kind

string

需配置为固定值:CE_API_CLUSTERS

spark.sql.catalog.${catalog}.shard_discovery.host

string

默认为 spark.sql.catalog.${catalog}.host

spark.sql.catalog.${catalog}.shard_discovery.port

80

int

分片发现主机端口

spark.sql.catalog.${catalog}.bytehouse_ce.api.account_id

火山引擎主账号ID

spark.sql.catalog.${catalog}.cluster

string

集群名称

spark.sql.catalog.${catalog}.option

map

JDBC参数

spark.bytehouse.write.batchSize

10000

int

JDBC写入批处理大小

spark.clickhouse.write.repartitionByPartition

true

bool

写入前是否通过ClickHouse分区键来重新划分数据以满足ClickHouse分布

spark.clickhouse.write.distributed.useClusterNodes

true

bool

写分布式表时写入集群所有节点

spark.clickhouse.write.distributed.convertLocal

false

bool

当写分布式表时,改为写本地表

spark.clickhouse.read.distributed.convertLocal

true

bool

当读取分布式表时,改为读取本地表

spark.clickhouse.read.filterByPartition

true

bool

读取表时,按_partition_id或分区值过滤

spark.clickhouse.write.localSortByPartition

true

bool

如果为true,写入前按分区做本地排序

spark.clickhouse.write.localSortByKey

true

bool

如果为true,写入前通过排序键做本地排序

spark.clickhouse.write.format

json

string

当前仅支持JSON格式

spark.clickhouse.write.useShardsWriter

false

string

如果为true,则在写入分片之前缓存分片中的行

spark.clickhouse.write.shardingStrategy

AUTO

string

分片策略,支持HASH、AUTO

spark.clickhouse.write.shardingExpression

string

分片表达式,支持cityHash64(xx)、intHash64(xx) 和 sipHash64(xx)

spark.clickhouse.read.format

json

string

当前仅支持JSON格式

数据类型映射

Spark数据类型

ByteHouse数据类型

读取

写入

Integer types

UInt8

UInt16

UInt32

UInt64

Int8

Int16

Int32

Int64

Floating-point numbers

Float32

Float64

Decimal

Boolean

bool

Strings

String

FixedString

Dates

Date

DateTime

UUID

UUID

Enum

Enum8

Enum16

Arrays

Array(T)

Maps

Map(K, V)

支持的Spark Catalog函数

Spark Catalog函数

是否支持

Spark SQL 语法

ByteHouse 企业版 SQL 语法

listTables

show tables

SHOW TABLES

loadTable

show create table xx

SHOW CREATE TABLE xx

createTable

CREATE TABLE [IF NOT EXISTS] [tableIdentifier] [UUID uuid]
(
    [tableColumnDfnt],
    [CONSTRAINT constraint_name CHECK columnExpr,]
    ...
) [engineClause]

dropTable

drop table xx.xxx

DROP TABLE xx.xxx

renameTable

rename table xx to xxx

RENAME TABLE xx.xxx to xx.xxx

listNamespaces

show databases

SHOW DATABASES

createNamespace

create database xx

CREATE DATABASE xx

dropNamespace

drop database xx

DROP DATABASE xx