You need to enable JavaScript to run this app.
导航
ClickHouse Python Driver
最近更新时间:2025.06.25 15:55:50首次发布时间:2023.12.18 12:14:26
我的收藏
有用
有用
无用
无用

本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。

注意事项
  • ByteHouse 兼容下列开源 ClickHouse Python Driver 程序: mymarilyn/clickhouse-driver (非 Clickhouse 官方驱动,本文已在程序 0.2.9 版本下验证)

    说明

    需要 Python 3.8 或更高版本的支持。

  • 使用 clickhouse-driver 进行数据导入时,Python 侧对数据 chunk 序列化开销比较大,因此不适用于大规模数据写入场景(包括利用 clickhouse-driver 测试 ByteHouse 的写入吞吐性能)。

前提条件

访问 ByteHouse 所需的连接信息,请参考 获取连接信息 获取。

程序安装

安装依赖项

您可能需要安装以下依赖项:

apk add gcc musl-dev

从 PyPI 安装

可以通过如下命令,获取最新发布版本的 clickhouse-driver。

pip3 install clickhouse-driver

从 GitHub 安装

开发版本通过如下命令安装。您可以在程序 Github 主页 获取最新的文档和发布版本信息。

pip3 install git+https://github.com/mymarilyn/clickhouse-driver@master#egg=clickhouse-driver

支持数据类型

Data Types

数据类型

Insert Type

Select Type

[U]Int8/16/32/64/128/256

int, long

int

Float32/64

float, int, long

float

Date/Date32

date, datetime

date

DateTime

datetime, int, long

datetime

String/FixedString(N)

str, bytes

str, bytes

Enum8/16

Enum, int, long, str

str

Array(T)

list, tuple

list

Bool

bool

bool

UUID

str, UUID

str

Decimal

Decimal, float, int, long

Decimal

IPv4/IPv6

IPv4Address/IPv6Address, int, long, str

IPv4Address/IPv6Address

设置连接信息

使用 IAM 用户连接 ByteHouse

可参考下面代码样例设置 ByteHouse 连接信息,通用参数说明如下。

参数

配置要点

Host

配置为 ByteHouse 的网络域名,您可以在 ByteHouse 控制台的 租户管理>基本信息>网络信息 中查看对应信息。详情请参见步骤二:配置网络信息

Port

配置为默认值 19000。

User & Password

  • User:设置为 bytehouse。
  • Password:为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理>连接信息 中获取 API Key。详情请参见获取 API Key

Database

配置为连接 ByteHouse 的数据库名称。

您可通过传递参数或连接字符串的方式设置 ByteHouse 连接信息。

  • 方式 1:通过传递参数

    from clickhouse_driver import Client
    
    client = Client(
        host="{HOST}",
        port=19000,
        user="bytehouse",
        password="{API_KEY}",
        database="{DATABASE}",
        secure=True,
        verify=False,
    )
    
  • 方式 2:通过连接字符串

    from clickhouse_driver import Client
    
    uri = "clickhouse://bytehouse:{API_KEY}@{HOST}:19000/{DATABASE}?secure=True&verify=False"
    client = Client.from_url(uri)
    

使用数据库用户连接 ByteHouse

可参考下面代码样例设置 ByteHouse 连接信息,通用参数说明如下。

参数

配置要点

Host

配置为 ByteHouse 的网络域名,您可以在 ByteHouse 控制台的 租户管理>基本信息>网络信息 中查看对应信息。详情请参见步骤二:配置网络信息

Port

配置为默认值 8123。

User & Password

  • User 配置为 {accountID_or_accountName}::{username}[::{envID}],详情请参见步骤三:获取 ByteHouse 连接串信息
    • {accountID_or_accountName} :指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • {username} :指登录 ByteHouse 数据库的用户名。登录 ByteHouse 数据库账号用户名。可在 ByteHouse 控制台>权限管理>用户>查看数据库用户名
    • envID:可选,指数据库所在的环境名称。如果使用 default 环境,可不配置,如需使用其他环境,需指定环境名称。您可登录 ByteHouse 控制台,在租户管理>基本信息>**当前环境 **中获取。
  • Password:数据库账号的密码由管理员创建数据库账号时自定义配置,您可联系管理员获取密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

Database

配置为连接 ByteHouse 的数据库名称。

您可通过传递参数或连接字符串的方式设置 ByteHouse 连接信息。

  • 方式 1:通过传递参数:

    from clickhouse_driver import Client
    
    client = Client(
        host="{HOST}",
        port=19000,
        user="{accountID_or_accountName}::{username}[::{envID}]",
        password="{password}",
        database="{DATABASE}",
        secure=True,
        verify=False,
    )
    
  • 方式 2:通过连接字符串:

    from clickhouse_driver import Client
    
    uri = "clickhouse://{accountID_or_accountName}::{username}[::{envID}]:{password}@{HOST}:19000/{DATABASE}?secure=True&verify=False"
    client = Client.from_url(uri)
    

使用示例

基本用法示例

请参阅以下基本查询的示例代码。请按照设置连接信息中的信息替换代码中连接配置的字段。

from clickhouse_driver import Client
from datetime import datetime

# 连接至 ByteHouse
client = Client(
    host="{HOST}",
    port=19000,
    user="bytehouse",
    password="{API_KEY}",
    database="{DATABASE}",
    secure=True,
    verify=False,
)

# 创建表
create_table_query = """
CREATE TABLE IF NOT EXISTS example_table (
    id Int32,
    name String,
    created_at DateTime
)
ENGINE = CnchMergeTree()
ORDER BY id;
"""
client.execute(create_table_query)

# 插入数据
data = [
    (1, 'Alice', datetime(2024, 11, 20, 10, 30, 0)),
    (2, 'Bob', datetime(2024, 11, 20, 11, 0, 0)),
    (3, 'Charlie', datetime(2024, 11, 20, 12, 15, 0)),
]
client.execute("INSERT INTO example_table (id, name, created_at) VALUES", data)

# 查询数据
rows = client.execute("SELECT * FROM example_table")
for row in rows:
    print(row)

指定服务器设置

如果 ByteHouse 服务器支持某设置(例如virtual_warehouse),而开源驱动包中没有该设置,可以通过参考下面的示例增加。

from clickhouse_driver import Client
from clickhouse_driver.settings.available import settings as available_settings, SettingString

available_settings['virtual_warehouse'] = SettingString

client = Client(
    host="{HOST}",
    port=19000,
    user="bytehouse",
    password="{API_KEY}",
    database="{DATABASE}",
    secure=True,
    verify=False,
    settings={"virtual_warehouse": "vw-123456789-vw-name"},
)

指定查询 ID

from clickhouse_driver import Client

# client = Client(...) # Initialize client

query_id = "bbd7dea3-eb63-4a21-b727-f55b420a7223"
client.execute("SELECT 1", query_id=query_id)

从 Pandas dataframe 插入数据

'use_numpy': True 设置需要 Numpy / Pandas 包的支持,可参考下面的示例。

from clickhouse_driver import Client
import pandas as pd

client = Client(
    host="{HOST}",
    port=19000,
    user="bytehouse",
    password="{API_KEY}",
    database="{DATABASE}",
    secure=True,
    verify=False,
    settings={"use_numpy": True},
)

client.execute(
   'CREATE TABLE test_df (x Int, y String) Engine = CnchMergeTree() ORDER BY tuple()'
)

df_insert = pd.DataFrame({
    'x': [1, 2],
    'y': ['apple', 'banana'],
}, dtype=object)

client.insert_dataframe('INSERT INTO test_df VALUES', df_insert)

df_select = client.query_dataframe('SELECT * FROM test_df')

print(df_select)

#    x       y
# 0  1   apple
# 1  2  banana

从 CSV 文件插入数据

假设您的 CSV 文件中有以下数据:

time,order,qty
2019-08-01 15:23:14,New order1,5
2019-08-05 09:14:45,New order2,3
2019-08-13 12:20:32,New order3,7

可以通过以下方式将数据插入ByteHouse:

from csv import DictReader
from datetime import datetime

from clickhouse_driver import Client

# client = Client(...) # Initialize client

def iter_csv(filename):
    converters = {
        'time': lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'),
        'order': str,
        'qty': int,
    }
    with open(filename, 'r') as f:
        reader = DictReader(f)
        for line in reader:
            yield {k: (converters[k](v) if k in converters else v) for k, v in line.items()}


client.execute(
    'CREATE TABLE IF NOT EXISTS data_csv (time DateTime, order String, qty Int32) Engine = CnchMergeTree() ORDER BY tuple()'
)
client.execute('INSERT INTO data_csv VALUES', iter_csv('data.csv'))

根据列名检索数据

通过设置“with_column_types”= True ,可以使用按列名检索数据。

from clickhouse_driver import Client

# client = Client(...) # Initialize client

client.execute("""
CREATE TABLE IF NOT EXISTS test_table (
    int_column Int32,
    string_column String,
    array_column Array(Int32)
) ENGINE = CnchMergeTree()
ORDER BY int_column
""")

data_to_insert = [
    (1, 'First', [1, 2, 3]),
    (2, 'Second', [4, 5, 6]),
    (3, 'Third', [7, 8, 9])
]

client.execute(
    "INSERT INTO test_table (int_column, string_column, array_column) VALUES",
    data_to_insert
)

query = "SELECT * FROM test_table"
data, columns = client.execute(query, with_column_types=True)

# Use dictionary comprehension to create a dictionary with column names as keys
column_data = {col_name: [row[i] for row in data] for i, col_name in enumerate([col[0] for col in columns])}

# Access by column names
print("int_column:", column_data['int_column'])
print("string_column:", column_data['string_column'])
print("array_column:", column_data['array_column'])