You need to enable JavaScript to run this app.
导航
Spark Connector Driver
最近更新时间:2024.12.09 15:39:44首次发布时间:2024.08.23 11:10:49

ByteHouse 云数仓版 Spark Connector 连接器专门用于通过 Spark 将数据加载到 ByteHouse 云数仓版。
本文将介绍通过 Spark SQL ,以及 EMR 支持的 Servless Spark 两种方式连接ByteHouse并处理数据。

先决条件

  • Java8
  • Scala 2.12 及以上版本
  • Spark 3.5 及以上版本

驱动安装

请按照下面的方法,在程序中配置以下驱动的依赖项。

  1. ByteHouse Spark Connector。
  2. ClickHouse JDBC 驱动程序。
  3. ByteHouse JDBC 驱动程序。

Maven 依赖

对于要使用 Spark connector 连接器进行编译的 Maven 项目,请将以下依赖项添加到项目的 pom.xml 文件中。

<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-jdbc</artifactId>
    <version>0.4.6</version>
</dependency>
<dependency>
    <groupId>com.bytedance.bytehouse</groupId>
    <artifactId>driver-java</artifactId>
    <version>1.1.58</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.bytedance.bytehouse</groupId>
    <artifactId>clickhouse-spark-runtime-3.5_2.12</artifactId>
    <version>0.8.0.6</version>
</dependency>

然后,将以下存储库添加到 pom.xml文件:

<repository>
    <id>bytedance</id>
    <name>ByteDance Public Repository</name>
    <url>https://artifact.bytedance.com/repository/releases</url>
</repository>

用法示例

Spark SQL

您可以参考下面的命令,基于 Spark SQL CLI 连接到 ByteHouse。
请注意替换:

  1. 填写 export 字段中的 CLICKHOUSE_HOSTCLICKHOUSE_PASSWORD (即API key)的值,您可以参见 获取 ByteHouse 连接信息来获取;
  2. 填写 export 字段中的数据库名CLICKHOUSE_DATABASE和计算组名CLICKHOUSE_VW
  3. --jars 配置中实际使用的 jar 实际文件路径。
export SPARK_LOCAL_IP=localhost
export SPARK_HOME=/opt/tiger/spark
export CLICKHOUSE_HOST=<your_HOST>
export CLICKHOUSE_PASSWORD=<your-api-key>
export CLICKHOUSE_DATABASE=<database>
export CLICKHOUSE_VW=<your-virtual-warehouse>
export BYTEHOUSE_VW_ID=<your-virtual-warehouse-id>

$SPARK_HOME/bin/spark-sql \
  --conf spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog \
  --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
  --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
  --conf spark.sql.catalog.clickhouse.database=${CLICKHOUSE_DATABASE} \
  --conf spark.sql.catalog.clickhouse.option.vw=${CLICKHOUSE_VW} \
  --conf spark.driver.bindAddress=127.0.0.1 \
  --conf spark.sql.catalog.clickhouse.option.virtual_warehouse=${BYTEHOUSE_VW_ID} \
  --conf spark.sql.catalog.clickhouse.option.is_cnch=true \
  --conf spark.sql.catalog.clickhouse.gateway_version=v2 \
  --conf spark.bytehouse.write.batchSize=50000 \
  --conf spark.bytehouse.write.format=jdbc \
  --jars clickhouse-jdbc-0.4.6-all.jar,driver-java-1.1.44.jar,clickhouse-spark-runtime-3.5_2.12-0.8.0.6.jar

Servless Spark

Servless Spark 方式适用于火山引擎 EMR(E-MapReduce)服务。
您可以参考下面的命令,基于 Servless Spark 方式连接到 ByteHouse。
请注意替换:

  1. 填写set 中的 vpc 网络信息。您可以在 ByteHouse控制台 租户管理-基本信息 页面的最下方查询到此信息。
  2. 填写set 中的 AK/SK(access key、secret key)信息,您可参见 获取Access Key
  3. 填写 set 字段中的公网或私网 HOST 地址,以及API key,您可以参见 获取 ByteHouse 连接信息来获取;同时设置 database 数据库名和VW 计算组名。
  4. 实际使用的 jar 的 tos 路径。
set serverless.spark.analysis=true;
set spark.hadoop.fs.tos.skip.resolve = true;
set las.cross.vpc.access.enabled = true;
set las.cross.vpc.vpc.id = <your-vpc-id>;
-- 使用可用区B子网
set las.cross.vpc.subnet.id = <your-subnet-id>;
set las.cross.vpc.security.group.id = <your-securitygroup-id>;
set las.cross.vpc.accountId=<your-account-id>;
set las.cluster.type = vke;
set emr.serverless.spark.only.parse.enabled  = true;
set serverless.spark.access.key=<your-tos-ak>;
set serverless.spark.secret.key=<your-tos-sk>;
set spark.sql.catalog.clickhouse=com.bytehouse.ByteHouseCatalog;
-- cdw 内网连接地址
set spark.sql.catalog.clickhouse.host=tenant-<your-account-id>-cn-beijing.bytehouse.ivolces.com;
-- 公网连接地址
-- set spark.sql.catalog.clickhouse.host=bytehouse-cn-beijing.volces.com;
set spark.sql.catalog.clickhouse.protocol=http;
set spark.sql.catalog.clickhouse.password=<your-api-key>;
set spark.sql.catalog.clickhouse.database=<database>;
set spark.sql.catalog.clickhouse.option.vw=<your-virtual-warehouse>;
-- gateway v2
set spark.sql.catalog.clickhouse.option.virtual_warehouse=${your-virtual-warehouse-id};
set spark.sql.catalog.clickhouse.gateway_version=v2;
set spark.sql.catalog.clickhouse.option.is_cnch=true;

set las.spark.jar.depend.jars = [{"schema":"","fileName":"tos://test-bh-tos/cdw/driver-1.1.49-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-jdbc-0.4.6-all.jar"},{"schema":"","fileName":"tos://test-bh-tos/cdw/clickhouse-spark-runtime-3.5_2.12-0.8.0.6.jar"}];

use clickhouse;

-- 查询 hive
-- select * from spark_catalog.hive_db.hive_table;
-- select * from people_t_copy;
insert into people_t_copy select * from people_t; 

配置参数

参数

是否必选

默认值

数据类型

描述

起始版本

spark.sql.catalog.${catalog}.host

string

例如 bytehouse-cn-beijing.volces.com,
有关更多信息,请参阅 支持的地域及可用区

spark.sql.catalog.${catalog}.http_port

19000

int

spark.sql.catalog.${catalog}.protocol

http

string

spark.sql.catalog.${catalog}.user

bytehouse

string

spark.sql.catalog.${catalog}.password

string

参阅 获取 API Token

spark.sql.catalog.${catalog}.database

default

string

spark.sql.catalog.${catalog}.gateway_version

v1

string

v1或v2
若网关为v2,则需要 ByteHouse JDBC 驱动版本>=1.1.54

0.8.0.3

spark.sql.catalog.${catalog}.timezone

server

string

server, client, UTC+3, Asia/Shanghai, etc.

spark.sql.catalog.${catalog}.dedup_key_mode

string

throw, append, replace, ignore

0.8.0.3

spark.sql.catalog.${catalog}.option

map

其中 vw 参数是必选项,配置如下:
spark.sql.catalog.${catalog}.option.vw=xx

如果是 v2网关,需要增加下面的配置项
spark.sql.catalog.${catalog}.option.is_cnch=true

spark.bytehouse.write.batchSize

10000

int

spark.bytehouse.write.maxRetry

3

int

如果写入失败时,最大重试次数

0.8.0.3

spark.bytehouse.write.retryInterval

10s

time

重试时间间隔

0.8.0.3

支持场景

场景

是否支持

Spark SQL 语法

ByteHouse 云数仓版 SQL 语法

listTables

show tables

SHOW TABLES FROM xx

loadTable

show create table xx

SHOW TABLES FROM xx LIKE xxx

createTable

CREATE TABLE [IF NOT EXISTS] [tableIdentifier] [UUID uuid]
(
[tableColumnDfnt],
[CONSTRAINT constraint_name CHECK columnExpr,]
...
) [engineClause]

dropTable

drop table xx.xxx

DROP TABLE xx.xxx

renameTable

rename table xx to xxx

RENAME TABLE xx.xxx to xx.xxx

listNamespaces

show databases

SHOW DATABASES

createNamespace

create database xx

CREATE DATABASE xx [with properties ('engine'='cnch')]

dropNamespace

drop database xx

DROP DATABASE xx

支持的数据类型

数据类型

读取

写入

Integer types

UInt8

UInt16

UInt32

UInt64

Int8

Int16

Int32

Int64

Floating-point numbers

Float32

Float64

Decimal

Boolean

bool

Strings

String

FixedString

Dates

Date

DateTime

UUID

UUID

Enum

Enum8

Enum16

Arrays

Array(T)

Maps

Map(K, V)