本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。
建议使用 Python 3.12 或更高版本。
Python driver 推荐 v0.2.9 及以上版本。
使用 ClickHouse-Driver 进行数据导入时,Python 侧对数据 chunk 序列化开销比较大,因此不适用于大规模数据写入场景(包括利用ClickHouse-driver 测试 ByteHouse 的写入吞吐性能)。
不支持 ByteHouse 的 JSONB 和 Bitmap64 的数据类型。
当前支持的数据类型:
数据类型 | Insert Type | Select Type |
|---|---|---|
[U]Int8/16/32/64/128/256 | int, long | int |
Float32/64 | float, int, long | float |
Date/Date32 | date, datetime | date |
DateTime | datetime, int, long | datetime |
String/FixedString(N) | str, bytes | str, bytes |
Enum8/16 | Enum, int, long, str | str |
Array(T) | list, tuple | list |
Bool | bool | bool |
UUID | str, UUID | str |
Decimal | Decimal, float, int, long | Decimal |
IPv4/IPv6 | IPv4Address/IPv6Address, int, long, str | IPv4Address/IPv6Address |
如果您在使用过程中遇到其他未知限制,请联系 ByteHouse 团队处理。
您可能需要安装以下依赖项:
apk add gcc musl-dev
可以通过如下命令,获取最新发布版本的 clickhouse-driver。
pip3 install clickhouse-driver
开发版本通过如下命令安装。您可以在程序 Github 主页 获取最新的文档和发布版本信息。
pip3 install git+https://github.com/mymarilyn/clickhouse-driver@master#egg=clickhouse-driver
ByteHouse 支持通过 IAM 用户或数据库用户连接 ClickHouse Python Driver。IAM 用户与数据库用户二者差异说明如下,您可按需选择。
更多 IAM 用户和数据库用户的介绍请参见以下文档:
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过 IAM 用户方式连接到 ByteHouse。
通用参数说明如下:
参数 | 使用 IAM 用户连接 |
|---|---|
host | 配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息。 |
port | 配置为默认值 19000。 |
user & password |
|
database | 配置为连接 ByteHouse 的数据库名称。 |
请参考步骤三:获取 ByteHouse 连接串信息,了解如何通过数据库用户方式连接到 ByteHouse。
通用参数说明如下:
参数 | 使用数据库用户连接 |
|---|---|
host | 配置为 ByteHouse 的公网/私网域名,您可以在 ByteHouse 控制台的 租户管理 > 基本信息 > 网络信息中查看并复制网络信息。详情请参见步骤二:配置网络信息。 |
port | 配置为默认值 19000。 |
user & password |
|
database | 配置为连接 ByteHouse 的数据库名称。 |
您可以使用以下代码连接至 ByteHouse,并开始使用标准语句开发 ByteHouse,用于查询、写入和读取数据。使用时注意替换连接语句中的 {Host}、{Password}、{User}、{Database}、{VIRTUAL_WAREHOUSE_ID} 等连接信息字段,获取方式请参见获取 ByteHouse 连接信息。
keepAlive,可以复用连接和避免短链接。ByteHouse 支持通过 Client 对象或通过 URL 配置连接。
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" client = Client( host=host, port=port, user=user, password=password, database=database, secure=True, verify=False, settings={"virtual_warehouse": virtual_warehouse_id}, )
注意
使用数据库账号通过 URL 配置连接时,user 需按照 {accountID_or_accountName}%3A%3A{username}[%3A%3A{envID}] 格式填写。例如:
21xxxxxxxx%3A%3Ademouser%3A%3Ademoenv21xxxxxxxx%3A%3Ademouserhost = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" uri = f"clickhouse://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}" client = Client.from_url(uri)
您可使用 Context 设置 query ID。
client.execute("DROP DATABASE IF EXISTS bhpythontest", query_id=f"customized_{uuid.uuid4()}")
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" client = Client( host=host, port=port, user=user, password=password, database=database, secure=True, verify=False, settings={ 'max_execution_time': 450, 'max_block_size': 10000 }, ) try: # 这个查询会自动使用连接时设置的参数 result = client.execute("SELECT getSetting('max_execution_time')") setting_value = result[0][0] print(f"全局设置值: {setting_value}") except Exception as e: print(f"全局设置示例错误: {e}") finally: client.disconnect()
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" client = Client( host=host, port=port, user=user, password=password, database=database, secure=True, verify=False ) try: settings = { 'max_execution_time': 450 } query = """ SELECT getSetting('max_execution_time') """ result = client.execute(query, settings=settings) if result: max_execution = result[0][0] print(f"max_execution_time: {max_execution}") assert max_execution == 450 print(f"设置值: {max_execution}") except Exception as e: print(f"设置示例错误: {e}") client.disconnect()
您可以使用以下代码连接 ByteHouse,并管理数据。请按照获取连接信息中的信息替换代码中连接配置的字段。
import uuid import time from datetime import datetime from clickhouse_driver import Client host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" # client = Client( # host=host, # port=port, # user=user, # password=password, # database=database, # secure=True, # verify=False, # settings={"virtual_warehouse": virtual_warehouse_id}, # ) uri = f"clickhouse://{user}:{password}@{host}:{port}/{database}?secure=True&verify=False&virtual_warehouse={virtual_warehouse_id}" client = Client.from_url(uri) client.execute("DROP DATABASE IF EXISTS bhpythontest", query_id=f"customized_{uuid.uuid4()}") client.execute('DROP DATABASE IF EXISTS bhpythontest') client.execute('CREATE DATABASE IF NOT EXISTS bhpythontest') client.execute( """ CREATE TABLE IF NOT EXISTS bhpythontest.example ( Col1 UInt8 , Col2 String , Col3 FixedString(3) , Col4 UUID , Col5 Map(String, UInt8) , Col6 Array(String) , Col7 Tuple(String, UInt8, Array(Map(String, String))) KV , Col8 DateTime ) Engine = CnchMergeTree() ORDER BY tuple() """) # 准备插入数据 values_list = [] for i in range(1000): value = ( 42, # uint8(42) 'ClickHouse', 'Inc', str(uuid.uuid4()), {'key': 1}, # Map(String, UInt8) ['Q', 'W', 'E', 'R', 'T', 'Y'], # Array(String) ('String Value', 5, [{'key': 'value'},{'key': 'value'},{'key': 'value'}]), # Tuple datetime.now() ) values_list.append(value) # 批量插入 insert_sql = "INSERT INTO bhpythontest.example (Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8) VALUES" client.execute(insert_sql, values_list) print("数据插入成功,插入了1000条记录") # 查询一条记录 select_sql = "SELECT * FROM bhpythontest.example LIMIT 1" result = client.execute(select_sql) if result: Col1, Col2, Col3, Col4, Col5, Col6, Col7, Col8 = result[0] print(f"查询结果:") print(f"Col1: {Col1}") print(f"Col2: {Col2}") print(f"Col3: {Col3}") print(f"Col4: {Col4}") print(f"Col5: {Col5}") print(f"Col6: {Col6}") print(f"Col7: {Col7}") print(f"Col8: {Col8}") # 删除表 drop_sql = "DROP TABLE IF EXISTS bhpythontest.example" client.execute(drop_sql) print("表已删除") client.disconnect()
'use_numpy': True 设置需要 Numpy / Pandas 包的支持,可参考下面的示例。
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" client = Client( host=host, port=port, user=user, password=password, database=database, secure=True, verify=False, settings={"virtual_warehouse": virtual_warehouse_id,"use_numpy": True}, ) client.execute( 'CREATE TABLE test_df (x Int, y String) Engine = CnchMergeTree() ORDER BY tuple()' ) df_insert = pd.DataFrame({ 'x': [1, 2], 'y': ['apple', 'banana'], }, dtype=object) client.insert_dataframe('INSERT INTO test_df VALUES', df_insert) df_select = client.query_dataframe('SELECT * FROM test_df') print(df_select)
假设您的 CSV 文件中有以下数据:
time,order,qty 2019-08-01 15:23:14,New order1,5 2019-08-05 09:14:45,New order2,3 2019-08-13 12:20:32,New order3,7
可以通过以下方式将数据插入 ByteHouse:
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" client = Client( host=host, port=port, user=user, password=password, database=database, secure=True, verify=False, settings={"virtual_warehouse": virtual_warehouse_id}, ) def iter_csv(filename): converters = { 'time': lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'), 'order': str, 'qty': int, } with open(filename, 'r') as f: reader = DictReader(f) for line in reader: yield {k: (converters[k](v) if k in converters else v) for k, v in line.items()} client.execute( 'CREATE TABLE IF NOT EXISTS data_csv (time DateTime, order String, qty Int32) Engine = CnchMergeTree() ORDER BY tuple()' ) client.execute('INSERT INTO data_csv VALUES', iter_csv('data.csv'))
通过设置 “with_column_types”= True ,可以使用按列名检索数据。
host = "{Host}" port = 19000 password = "{Password}" user = "{User}" database = "{Database}" virtual_warehouse_id = "{VIRTUAL_WAREHOUSE_ID}" client = Client( host=host, port=port, user=user, password=password, database=database, secure=True, verify=False, settings={"virtual_warehouse": virtual_warehouse_id}, ) client.execute(""" CREATE TABLE IF NOT EXISTS test_table ( int_column Int32, string_column String, array_column Array(Int32) ) ENGINE = CnchMergeTree() ORDER BY int_column """) data_to_insert = [ (1, 'First', [1, 2, 3]), (2, 'Second', [4, 5, 6]), (3, 'Third', [7, 8, 9]) ] client.execute( "INSERT INTO test_table (int_column, string_column, array_column) VALUES", data_to_insert ) query = "SELECT * FROM test_table" data, columns = client.execute(query, with_column_types=True) # Use dictionary comprehension to create a dictionary with column names as keys column_data = {col_name: [row[i] for row in data] for i, col_name in enumerate([col[0] for col in columns])} # Access by column names print("int_column:", column_data['int_column']) print("string_column:", column_data['string_column']) print("array_column:", column_data['array_column'])