You need to enable JavaScript to run this app.
导航
ClickHouse Python Driver
最近更新时间:2025.11.13 16:32:03首次发布时间:2023.12.18 12:14:26
复制全文
我的收藏
有用
有用
无用
无用

本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。

环境要求

建议使用 Python 3.12 或更高版本。

推荐版本

Python driver 推荐 v0.2.9 及以上版本。

使用限制
  • 使用 ClickHouse-Driver 进行数据导入时,Python 侧对数据 chunk 序列化开销比较大,因此不适用于大规模数据写入场景(包括利用ClickHouse-driver 测试 ByteHouse 的写入吞吐性能)。

  • 不支持 ByteHouse 的 JSONB 和 Bitmap64 的数据类型。

  • 当前支持的数据类型:

    数据类型

    Insert Type

    Select Type

    [U]Int8/16/32/64/128/256

    int, long

    int

    Float32/64

    float, int, long

    float

    Date/Date32

    date, datetime

    date

    DateTime

    datetime, int, long

    datetime

    String/FixedString(N)

    str, bytes

    str, bytes

    Enum8/16

    Enum, int, long, str

    str

    Array(T)

    list, tuple

    list

    Bool

    bool

    bool

    UUID

    str, UUID

    str

    Decimal

    Decimal, float, int, long

    Decimal

    IPv4/IPv6

    IPv4Address/IPv6Address, int, long, str

    IPv4Address/IPv6Address

  • 如果您在使用过程中遇到其他未知限制,请联系 ByteHouse 团队处理。

安装程序

安装依赖项

您可能需要安装以下依赖项:

apk add gcc musl-dev

从 PyPI 安装

可以通过如下命令,获取最新发布版本的 clickhouse-driver。

pip3 install clickhouse-driver

从 GitHub 安装

开发版本通过如下命令安装。您可以在程序 Github 主页 获取最新的文档和发布版本信息。

pip3 install git+https://github.com/mymarilyn/clickhouse-driver@master#egg=clickhouse-driver

获取 ByteHouse 连接信息

ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse Python Driver。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

  • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
  • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。

更多 IAM 用户和数据库用户的介绍请参见以下文档:

使用 IAM 用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:

参数

使用 IAM 用户连接

host

配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

port

配置为默认值 19000。

user & password

  • user:固定配置为 bytehouse
  • password:为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理 > 连接信息 中获取API Key。详情请参见获取 API Key

database

配置为连接 ByteHouse 的数据库名称。

使用数据库用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户方式连接到 ByteHouse。
通用参数说明如下:

参数

使用数据库用户连接

host

配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

port

配置为默认值 19000。

user & password

  • user:参数填写说明如下,详情请参见获取数据库用户及密码

    注意

    根据您使用的连接方式不同,user 的拼接方式不同:

    • 使用 Client 对象配置连接时,user 配置为 {accountID_or_accountName}::{username}[::{envID}]
    • 使用 URL 配置连接时,user 配置为 {accountID_or_accountName}%3A%3A{username}[%3A%3A{envID}]
    • {accountID_or_accountName} :指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • {username} :指登录 ByteHouse 数据库的用户名,可在 ByteHouse 控制台 > 权限管理 > 用户 > 查看数据库用户名
    • {envID}:可选配置,数据库所在的环境名称。如果使用 default 环境,可不配置;如需使用其他环境,需指定环境名称,配置时无需添加[]。您可登录 ByteHouse 控制台,在租户管理 > 基本信息 > 当前环境中获取。
      使用示例如下:
      • 配置环境 ID:21xxxxxxxx::demouser::demoenv21xxxxxxxx%3A%3Ademouser%3A%3Ademoenv
      • 不配置环境 ID:21xxxxxxxx::demouser21xxxxxxxx%3A%3Ademouser
  • password:可联系管理员获取数据库账号的密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

database

配置为连接 ByteHouse 的数据库名称。

基本用法

您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。使用时注意替换连接语句中的 {Host}{Password}{User}{Database}{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息

  • 超时时间配置:connect_timeout 默认为 10s、send_receive_timeout 默认为 300s。
  • 默认支持 keepAlive,可以复用连接和避免短链接。

连接 ByteHouse

ByteHouse 支持通过 Client 对象或通过 URL 配置连接。

Client 对象配置连接

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={"virtual_warehouse": virtual_warehouse_id},
)

通过 URL 配置连接

注意

使用数据库账号通过 URL 配置连接时,user 需按照 {accountID_or_accountName}%3A%3A{username}[%3A%3A{envID}] 格式填写。例如:

  • 配置环境 ID:21xxxxxxxx%3A%3Ademouser%3A%3Ademoenv
  • 不配置环境 ID:21xxxxxxxx%3A%3Ademouser
host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"


uri = f"clickhouse://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}"
client = Client.from_url(uri)

自定义 query ID

您可使用 Context 设置 query ID。

client.execute("DROP DATABASE IF EXISTS bhpythontest", query_id=f"customized_{uuid.uuid4()}")

自定义 query settings

全局设置

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={
        'max_execution_time': 450,
        'max_block_size': 10000
    },
)

try:
    # 这个查询会自动使用连接时设置的参数
    result = client.execute("SELECT getSetting('max_execution_time')")
    setting_value = result[0][0]
    print(f"全局设置值: {setting_value}")
except Exception as e:
    print(f"全局设置示例错误: {e}")
finally:
    client.disconnect()

SQL 级别设置

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False
)

try:
    settings = {
        'max_execution_time': 450
    }
    query = """
            SELECT 
                getSetting('max_execution_time')
            """
    result = client.execute(query, settings=settings)
    if result:
        max_execution = result[0][0]
        print(f"max_execution_time: {max_execution}")
        assert max_execution == 450
        print(f"设置值: {max_execution}")
except Exception as e:
    print(f"设置示例错误: {e}")

client.disconnect()

连接及查询完整示例

您可以使用以下代码连接 ByteHouse,并管理数据。请按照获取连接信息中的信息替换代码中连接配置的字段。

import uuid
import time
from datetime import datetime
from clickhouse_driver import Client

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

# client = Client(
#     host=host,
#     port=port,
#     user=user,
#     password=password,
#     database=database,
#     secure=True,
#     verify=False,
#     settings={"virtual_warehouse": virtual_warehouse_id},
# )

uri = f"clickhouse://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}"
client = Client.from_url(uri)

client.execute("DROP DATABASE IF EXISTS bhpythontest", query_id=f"customized_{uuid.uuid4()}")
client.execute('DROP DATABASE IF EXISTS bhpythontest')
client.execute('CREATE DATABASE IF NOT EXISTS bhpythontest')
client.execute(
"""
CREATE TABLE IF NOT EXISTS bhpythontest.example (
        Col1 UInt8
    , Col2 String
    , Col3 FixedString(3)
    , Col4 UUID
    , Col5 Map(String, UInt8)
    , Col6 Array(String)
    , Col7 Tuple(String, UInt8, Array(Map(String, String))) KV
    , Col8 DateTime
) Engine = CnchMergeTree() ORDER BY tuple()
""")

# 准备插入数据
values_list = []
for i in range(1000):
    value = (
        42,  # uint8(42)
        'ClickHouse',
        'Inc',
        str(uuid.uuid4()),
        {'key': 1},  # Map(String, UInt8)
        ['Q', 'W', 'E', 'R', 'T', 'Y'],  # Array(String)
        ('String Value', 5, [{'key': 'value'},{'key': 'value'},{'key': 'value'}]),  # Tuple
        datetime.now()
    )
    values_list.append(value)

# 批量插入
insert_sql = "INSERT INTO bhpythontest.example (Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8) VALUES"
client.execute(insert_sql, values_list)
print("数据插入成功,插入了1000条记录")

# 查询一条记录
select_sql = "SELECT * FROM bhpythontest.example LIMIT 1"
result = client.execute(select_sql)

if result:
    Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8 = result[0]
    print(f"查询结果:")
    print(f"Col1: {Col1}")
    print(f"Col2: {Col2}")
    print(f"Col3: {Col3}")
    print(f"Col4: {Col4}")
    print(f"Col5: {Col5}")
    print(f"Col6: {Col6}")
    print(f"Col7: {Col7}")
    print(f"Col8: {Col8}")

# 删除表
drop_sql = "DROP TABLE IF EXISTS bhpythontest.example"
client.execute(drop_sql)
print("表已删除")

client.disconnect()

从 Pandas dataframe 插入数据

'use_numpy': True 设置需要 Numpy / Pandas 包的支持,可参考下面的示例。

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={"virtual_warehouse": virtual_warehouse_id,"use_numpy": True},
)

client.execute(
   'CREATE TABLE test_df (x Int, y String) Engine = CnchMergeTree() ORDER BY tuple()'
)

df_insert = pd.DataFrame({
    'x': [1, 2],
    'y': ['apple', 'banana'],
}, dtype=object)

client.insert_dataframe('INSERT INTO test_df VALUES', df_insert)

df_select = client.query_dataframe('SELECT * FROM test_df')

print(df_select)

从 CSV 文件插入数据

假设您的 CSV 文件中有以下数据:

time,order,qty
2019-08-01 15:23:14,New order1,5
2019-08-05 09:14:45,New order2,3
2019-08-13 12:20:32,New order3,7

可以通过以下方式将数据插入 ByteHouse:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={"virtual_warehouse": virtual_warehouse_id},
)

def iter_csv(filename):
    converters = {
        'time': lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'),
        'order': str,
        'qty': int,
    }
    with open(filename, 'r') as f:
        reader = DictReader(f)
        for line in reader:
            yield {k: (converters[k](v) if k in converters else v) for k, v in line.items()}


client.execute(
    'CREATE TABLE IF NOT EXISTS data_csv (time DateTime, order String, qty Int32) Engine = CnchMergeTree() ORDER BY tuple()'
)
client.execute('INSERT INTO data_csv VALUES', iter_csv('data.csv'))

根据列名检索数据

通过设置 “with_column_types”= True ,可以使用按列名检索数据。

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={"virtual_warehouse": virtual_warehouse_id},
)

client.execute("""
CREATE TABLE IF NOT EXISTS test_table (
    int_column Int32,
    string_column String,
    array_column Array(Int32)
) ENGINE = CnchMergeTree()
ORDER BY int_column
""")

data_to_insert = [
    (1, 'First', [1, 2, 3]),
    (2, 'Second', [4, 5, 6]),
    (3, 'Third', [7, 8, 9])
]

client.execute(
    "INSERT INTO test_table (int_column, string_column, array_column) VALUES",
    data_to_insert
)

query = "SELECT * FROM test_table"
data, columns = client.execute(query, with_column_types=True)

# Use dictionary comprehension to create a dictionary with column names as keys
column_data = {col_name: [row[i] for row in data] for i, col_name in enumerate([col[0] for col in columns])}

# Access by column names
print("int_column:", column_data['int_column'])
print("string_column:", column_data['string_column'])
print("array_column:", column_data['array_column'])