本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。
ByteHouse 兼容下列开源 ClickHouse Python Driver 程序:
说明
需要 Python 3.8 或更高版本的支持。
访问 ByteHouse 所需的连接信息,请参考 获取连接信息 获取。
您可能需要安装以下依赖项:
apk add gcc musl-dev
可以通过如下命令,获取最新发布版本的 clickhouse-driver。
pip3 install clickhouse-driver
开发版本通过如下命令安装。
pip3 install git+https://github.com/mymarilyn/clickhouse-driver@master#egg=clickhouse-driver
连接示例
本章节介绍通过 mymarilyn/clickhouse-driver 程序连接 ByteHouse 的基本用法,您可以在程序 Github 主页 获取最新的文档和发布版本信息。
数据类型 | Insert Type | Select Type |
---|---|---|
[U]Int8/16/32/64/128/256 | int, long | int |
Float32/64 | float, int, long | float |
Date/Date32 | date, datetime | date |
DateTime | datetime, int, long | datetime |
String/FixedString(N) | str, bytes | str, bytes |
Enum8/16 | Enum, int, long, str | str |
Array(T) | list, tuple | list |
Bool | bool | bool |
UUID | str, UUID | str |
Decimal | Decimal, float, int, long | Decimal |
IPv4/IPv6 | IPv4Address/IPv6Address, int, long, str | IPv4Address/IPv6Address |
可参考下面代码样例设置 ByteHouse 连接信息,按照获取 ByteHouse 连接信息中信息替换下面的 HOST、PORT 和 API_KEY 字段。
from clickhouse_driver import Client client = Client( host="{HOST}", port=19000, user="bytehouse", password="{API_KEY}", database="{DATABASE}", secure=True, )
from clickhouse_driver import Client uri = "clickhouse://bytehouse:{API_KEY}@{HOST}:19000/{DATABASE}?secure=True" client = Client.from_url(uri)
请参阅以下基本查询的示例代码。按照获取 ByteHouse 连接信息中信息替换下面的 HOST、PORT 和 API_KEY 字段。
from clickhouse_driver import Client from datetime import datetime # Connect to ByteHouse client = Client( host="{HOST}", port=19000, user="bytehouse", password="{API_KEY}", database="{DATABASE}", secure=True, ) # Create Table create_table_query = """ CREATE TABLE IF NOT EXISTS example_table ( id Int32, name String, created_at DateTime ) ENGINE = CnchMergeTree() ORDER BY id; """ client.execute(create_table_query) # Insert Data data = [ (1, 'Alice', datetime(2024, 11, 20, 10, 30, 0)), (2, 'Bob', datetime(2024, 11, 20, 11, 0, 0)), (3, 'Charlie', datetime(2024, 11, 20, 12, 15, 0)), ] client.execute("INSERT INTO example_table (id, name, created_at) VALUES", data) # Query Data rows = client.execute("SELECT * FROM example_table") for row in rows: print(row)
如果ByteHouse服务器支持某设置(例如virtual_warehouse
),而开源驱动包中没有该设置,可以通过参考下面的示例增加。
from clickhouse_driver import Client from clickhouse_driver.settings.available import settings as available_settings, SettingString available_settings['virtual_warehouse'] = SettingString client = Client( host="{HOST}", port=19000, user="bytehouse", password="{API_KEY}", database="{DATABASE}", secure=True, settings={"virtual_warehouse": "vw-123456789-vw-name"}, )
from clickhouse_driver import Client # client = Client(...) # Initialize client query_id = "bbd7dea3-eb63-4a21-b727-f55b420a7223" client.execute("SELECT 1", query_id=query_id)
'use_numpy': True
设置需要 Numpy
/ Pandas
包的支持,可参考下面的示例。
from clickhouse_driver import Client import pandas as pd client = Client( host="{HOST}", port=19000, user="bytehouse", password="{API_KEY}", database="{DATABASE}", secure=True, settings={"use_numpy": True}, ) client.execute( 'CREATE TABLE test_df (x Int, y String) Engine = CnchMergeTree() ORDER BY tuple()' ) df_insert = pd.DataFrame({ 'x': [1, 2], 'y': ['apple', 'banana'], }, dtype=object) client.insert_dataframe('INSERT INTO test_df VALUES', df_insert) df_select = client.query_dataframe('SELECT * FROM test_df') print(df_select) # x y # 0 1 apple # 1 2 banana
假设您的 CSV 文件中有以下数据:
time,order,qty 2019-08-01 15:23:14,New order1,5 2019-08-05 09:14:45,New order2,3 2019-08-13 12:20:32,New order3,7
可以通过以下方式将数据插入ByteHouse:
from csv import DictReader from datetime import datetime from clickhouse_driver import Client # client = Client(...) # Initialize client def iter_csv(filename): converters = { 'time': lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'), 'order': str, 'qty': int, } with open(filename, 'r') as f: reader = DictReader(f) for line in reader: yield {k: (converters[k](v) if k in converters else v) for k, v in line.items()} client.execute( 'CREATE TABLE IF NOT EXISTS data_csv (time DateTime, order String, qty Int32) Engine = CnchMergeTree() ORDER BY tuple()' ) client.execute('INSERT INTO data_csv VALUES', iter_csv('data.csv'))
通过设置“with_column_types”= True
,可以使用按列名检索数据。
from clickhouse_driver import Client # client = Client(...) # Initialize client client.execute(""" CREATE TABLE IF NOT EXISTS test_table ( int_column Int32, string_column String, array_column Array(Int32) ) ENGINE = CnchMergeTree() ORDER BY int_column """) data_to_insert = [ (1, 'First', [1, 2, 3]), (2, 'Second', [4, 5, 6]), (3, 'Third', [7, 8, 9]) ] client.execute( "INSERT INTO test_table (int_column, string_column, array_column) VALUES", data_to_insert ) query = "SELECT * FROM test_table" data, columns = client.execute(query, with_column_types=True) # Use dictionary comprehension to create a dictionary with column names as keys column_data = {col_name: [row[i] for row in data] for i, col_name in enumerate([col[0] for col in columns])} # Access by column names print("int_column:", column_data['int_column']) print("string_column:", column_data['string_column']) print("array_column:", column_data['array_column'])