You need to enable JavaScript to run this app.
ByteHouse云数仓版

ByteHouse云数仓版

复制全文
Python
ClickHouse Python Driver
复制全文
ClickHouse Python Driver

本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。

环境要求

建议使用 Python 3.12 或更高版本。

推荐版本

Python driver 推荐 v0.2.9 及以上版本。

使用限制
  • 使用 ClickHouse-Driver 进行数据导入时,Python 侧对数据 chunk 序列化开销比较大,因此不适用于大规模数据写入场景(包括利用ClickHouse-driver 测试 ByteHouse 的写入吞吐性能)。

  • 不支持 ByteHouse 的 JSONB 和 Bitmap64 的数据类型。

  • 当前支持的数据类型:

    数据类型

    Insert Type

    Select Type

    [U]Int8/16/32/64/128/256

    int, long

    int

    Float32/64

    float, int, long

    float

    Date/Date32

    date, datetime

    date

    DateTime

    datetime, int, long

    datetime

    String/FixedString(N)

    str, bytes

    str, bytes

    Enum8/16

    Enum, int, long, str

    str

    Array(T)

    list, tuple

    list

    Bool

    bool

    bool

    UUID

    str, UUID

    str

    Decimal

    Decimal, float, int, long

    Decimal

    IPv4/IPv6

    IPv4Address/IPv6Address, int, long, str

    IPv4Address/IPv6Address

  • 如果您在使用过程中遇到其他未知限制,请联系 ByteHouse 团队处理。

安装程序

安装依赖项

您可能需要安装以下依赖项:

apk add gcc musl-dev

从 PyPI 安装

可以通过如下命令,获取最新发布版本的 clickhouse-driver。

pip3 install clickhouse-driver

从 GitHub 安装

开发版本通过如下命令安装。您可以在程序 Github 主页 获取最新的文档和发布版本信息。

pip3 install git+https://github.com/mymarilyn/clickhouse-driver@master#egg=clickhouse-driver

获取 ByteHouse 连接信息

ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse Python Driver。IAM 用户与数据库用户二者差异说明如下,您可按需选择。

  • IAM 用户为火山引擎访问控制(IAM)中创建的用户,其权限由 IAM 权限策略及您授予的 ByteHouse 资源和数据权限决定。IAM 用户可访问 ByteHouse 控制台,也支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。
  • 数据库用户为 ByteHouse 中创建的数据库级别用户,可为其授予环境、资源和数据权限。数据库用户不可访问 ByteHouse 控制台,但支持通过 CLI、连接驱动、生态工具、API 等方式访问 ByteHouse。

更多 IAM 用户和数据库用户的介绍请参见以下文档:

使用 IAM 用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:

参数

使用 IAM 用户连接

host

配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

port

配置为默认值 19000。

user & password

  • user:固定配置为 bytehouse
  • password:为 ByteHouse 的 <API_Key>,您可以在 ByteHouse 控制台的 租户管理 > 连接信息 中获取API Key。详情请参见获取 API Key

database

配置为连接 ByteHouse 的数据库名称。

使用数据库用户连接

请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户方式连接到 ByteHouse。
通用参数说明如下:

参数

使用数据库用户连接

host

配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息

port

配置为默认值 19000。

user & password

  • user:参数填写说明如下,详情请参见获取数据库用户及密码

    注意

    根据您使用的连接方式不同,user 的拼接方式不同:

    • 使用 Client 对象配置连接时,user 配置为 {accountID_or_accountName}::{username}[::{envID}]
    • 使用 URL 配置连接时,user 配置为 {accountID_or_accountName}%3A%3A{username}[%3A%3A{envID}]
    • {accountID_or_accountName} :指火山引擎用户账号 ID 或名称,可登录 ByteHouse 控制台,单击右上角个人中心查看并复制账号 ID 或名称。
    • {username} :指登录 ByteHouse 数据库的用户名,可在 ByteHouse 控制台 > 权限管理 > 用户 > 查看数据库用户名
    • {envID}:可选配置,数据库所在的环境名称。如果使用 default 环境,可不配置;如需使用其他环境,需指定环境名称,配置时无需添加[]。您可登录 ByteHouse 控制台,在租户管理 > 基本信息 > 当前环境中获取。
      使用示例如下:
      • 配置环境 ID:21xxxxxxxx::demouser::demoenv21xxxxxxxx%3A%3Ademouser%3A%3Ademoenv
      • 不配置环境 ID:21xxxxxxxx::demouser21xxxxxxxx%3A%3Ademouser
  • password:可联系管理员获取数据库账号的密码。如果密码丢失或遗忘,可通联系管理员重置密码,详情请参考重置密码

database

配置为连接 ByteHouse 的数据库名称。

基本用法

您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。使用时注意替换连接语句中的 {Host}{Password}{User}{Database}{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息

  • 超时时间配置:connect_timeout 默认为 10s、send_receive_timeout 默认为 300s。
  • 默认支持 keepAlive,可以复用连接和避免短链接。

连接 ByteHouse

ByteHouse 支持通过 Client 对象或通过 URL 配置连接。

Client 对象配置连接

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={"virtual_warehouse": virtual_warehouse_id},
)

通过 URL 配置连接

注意

使用数据库账号通过 URL 配置连接时,user 需按照 {accountID_or_accountName}%3A%3A{username}[%3A%3A{envID}] 格式填写。例如:

  • 配置环境 ID:21xxxxxxxx%3A%3Ademouser%3A%3Ademoenv
  • 不配置环境 ID:21xxxxxxxx%3A%3Ademouser
host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"


uri = f"clickhouse://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}"
client = Client.from_url(uri)

自定义 query ID

您可使用 Context 设置 query ID。

client.execute("DROP DATABASE IF EXISTS bhpythontest", query_id=f"customized_{uuid.uuid4()}")

自定义 query settings

全局设置

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={
        'max_execution_time': 450,
        'max_block_size': 10000
    },
)

try:
    # 这个查询会自动使用连接时设置的参数
    result = client.execute("SELECT getSetting('max_execution_time')")
    setting_value = result[0][0]
    print(f"全局设置值: {setting_value}")
except Exception as e:
    print(f"全局设置示例错误: {e}")
finally:
    client.disconnect()

SQL 级别设置

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False
)

try:
    settings = {
        'max_execution_time': 450
    }
    query = """
            SELECT 
                getSetting('max_execution_time')
            """
    result = client.execute(query, settings=settings)
    if result:
        max_execution = result[0][0]
        print(f"max_execution_time: {max_execution}")
        assert max_execution == 450
        print(f"设置值: {max_execution}")
except Exception as e:
    print(f"设置示例错误: {e}")

client.disconnect()

连接及查询完整示例

您可以使用以下代码连接 ByteHouse,并管理数据。请按照获取连接信息中的信息替换代码中连接配置的字段。

import uuid
import time
from datetime import datetime
from clickhouse_driver import Client

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

# client = Client(
#     host=host,
#     port=port,
#     user=user,
#     password=password,
#     database=database,
#     secure=True,
#     verify=False,
#     settings={"virtual_warehouse": virtual_warehouse_id},
# )

uri = f"clickhouse://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}"
client = Client.from_url(uri)

client.execute("DROP DATABASE IF EXISTS bhpythontest", query_id=f"customized_{uuid.uuid4()}")
client.execute('DROP DATABASE IF EXISTS bhpythontest')
client.execute('CREATE DATABASE IF NOT EXISTS bhpythontest')
client.execute(
"""
CREATE TABLE IF NOT EXISTS bhpythontest.example (
        Col1 UInt8
    , Col2 String
    , Col3 FixedString(3)
    , Col4 UUID
    , Col5 Map(String, UInt8)
    , Col6 Array(String)
    , Col7 Tuple(String, UInt8, Array(Map(String, String))) KV
    , Col8 DateTime
) Engine = CnchMergeTree() ORDER BY tuple()
""")

# 准备插入数据
values_list = []
for i in range(1000):
    value = (
        42,  # uint8(42)
        'ClickHouse',
        'Inc',
        str(uuid.uuid4()),
        {'key': 1},  # Map(String, UInt8)
        ['Q', 'W', 'E', 'R', 'T', 'Y'],  # Array(String)
        ('String Value', 5, [{'key': 'value'},{'key': 'value'},{'key': 'value'}]),  # Tuple
        datetime.now()
    )
    values_list.append(value)

# 批量插入
insert_sql = "INSERT INTO bhpythontest.example (Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8) VALUES"
client.execute(insert_sql, values_list)
print("数据插入成功,插入了1000条记录")

# 查询一条记录
select_sql = "SELECT * FROM bhpythontest.example LIMIT 1"
result = client.execute(select_sql)

if result:
    Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8 = result[0]
    print(f"查询结果:")
    print(f"Col1: {Col1}")
    print(f"Col2: {Col2}")
    print(f"Col3: {Col3}")
    print(f"Col4: {Col4}")
    print(f"Col5: {Col5}")
    print(f"Col6: {Col6}")
    print(f"Col7: {Col7}")
    print(f"Col8: {Col8}")

# 删除表
drop_sql = "DROP TABLE IF EXISTS bhpythontest.example"
client.execute(drop_sql)
print("表已删除")

client.disconnect()

从 Pandas dataframe 插入数据

'use_numpy': True 设置需要 Numpy / Pandas 包的支持,可参考下面的示例。

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={"virtual_warehouse": virtual_warehouse_id,"use_numpy": True},
)

client.execute(
   'CREATE TABLE test_df (x Int, y String) Engine = CnchMergeTree() ORDER BY tuple()'
)

df_insert = pd.DataFrame({
    'x': [1, 2],
    'y': ['apple', 'banana'],
}, dtype=object)

client.insert_dataframe('INSERT INTO test_df VALUES', df_insert)

df_select = client.query_dataframe('SELECT * FROM test_df')

print(df_select)

从 CSV 文件插入数据

假设您的 CSV 文件中有以下数据:

time,order,qty
2019-08-01 15:23:14,New order1,5
2019-08-05 09:14:45,New order2,3
2019-08-13 12:20:32,New order3,7

可以通过以下方式将数据插入 ByteHouse:

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={"virtual_warehouse": virtual_warehouse_id},
)

def iter_csv(filename):
    converters = {
        'time': lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'),
        'order': str,
        'qty': int,
    }
    with open(filename, 'r') as f:
        reader = DictReader(f)
        for line in reader:
            yield {k: (converters[k](v) if k in converters else v) for k, v in line.items()}


client.execute(
    'CREATE TABLE IF NOT EXISTS data_csv (time DateTime, order String, qty Int32) Engine = CnchMergeTree() ORDER BY tuple()'
)
client.execute('INSERT INTO data_csv VALUES', iter_csv('data.csv'))

根据列名检索数据

通过设置 “with_column_types”= True ,可以使用按列名检索数据。

host = "{Host}"
port = 19000
password = "{Password}"
user = "{User}"
database = "{Database}"
virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}"

client = Client(
    host=host,
    port=port,
    user=user,
    password=password,
    database=database,
    secure=True,
    verify=False,
    settings={"virtual_warehouse": virtual_warehouse_id},
)

client.execute("""
CREATE TABLE IF NOT EXISTS test_table (
    int_column Int32,
    string_column String,
    array_column Array(Int32)
) ENGINE = CnchMergeTree()
ORDER BY int_column
""")

data_to_insert = [
    (1, 'First', [1, 2, 3]),
    (2, 'Second', [4, 5, 6]),
    (3, 'Third', [7, 8, 9])
]

client.execute(
    "INSERT INTO test_table (int_column, string_column, array_column) VALUES",
    data_to_insert
)

query = "SELECT * FROM test_table"
data, columns = client.execute(query, with_column_types=True)

# Use dictionary comprehension to create a dictionary with column names as keys
column_data = {col_name: [row[i] for row in data] for i, col_name in enumerate([col[0] for col in columns])}

# Access by column names
print("int_column:", column_data['int_column'])
print("string_column:", column_data['string_column'])
print("array_column:", column_data['array_column'])
最近更新时间:2025.11.13 16:32:03
这个页面对您有帮助吗?
有用
有用
无用
无用