You need to enable JavaScript to run this app.
导航
ClickHouse Python Driver
最近更新时间:2024.12.09 17:41:28首次发布时间:2023.12.18 12:14:26

本文介绍如何在 Python 开发环境连接并访问 ByteHouse 云数仓。
ByteHouse 兼容下列开源 ClickHouse Python Driver 程序:

  • mymarilyn/clickhouse-driver (非 Clickhouse 官方驱动,本文已在程序 0.2.9 版本下验证)

说明

需要 Python 3.8 或更高版本的支持。

前提条件

访问 ByteHouse 所需的连接信息,请参考 获取连接信息 获取。

程序安装

安装依赖项

您可能需要安装以下依赖项:

apk add gcc musl-dev

从 PyPI 安装

可以通过如下命令,获取最新发布版本的 clickhouse-driver。

pip3 install clickhouse-driver

从 github 安装

开发版本通过如下命令安装。

pip3 install git+https://github.com/mymarilyn/clickhouse-driver@master#egg=clickhouse-driver

连接示例
本章节介绍通过 mymarilyn/clickhouse-driver 程序连接 ByteHouse 的基本用法,您可以在程序 Github 主页 获取最新的文档和发布版本信息。

支持数据类型

Data Types

数据类型

Insert Type

Select Type

[U]Int8/16/32/64/128/256

int, long

int

Float32/64

float, int, long

float

Date/Date32

date, datetime

date

DateTime

datetime, int, long

datetime

String/FixedString(N)

str, bytes

str, bytes

Enum8/16

Enum, int, long, str

str

Array(T)

list, tuple

list

Bool

bool

bool

UUID

str, UUID

str

Decimal

Decimal, float, int, long

Decimal

IPv4/IPv6

IPv4Address/IPv6Address, int, long, str

IPv4Address/IPv6Address

使用示例

设置连接信息

可参考下面代码样例设置 ByteHouse 连接信息,按照获取 ByteHouse 连接信息中信息替换下面的 HOST、PORT 和 API_KEY 字段。

  1. 通过传递参数的方式:
from clickhouse_driver import Client

client = Client(
    host="{HOST}",
    port=19000,
    user="bytehouse",
    password="{API_KEY}",
    database="{DATABASE}",
    secure=True,
)
  1. 通过连接字符串的方式。
from clickhouse_driver import Client

uri = "clickhouse://bytehouse:{API_KEY}@{HOST}:19000/{DATABASE}?secure=True"
client = Client.from_url(uri)

基本用法示例

请参阅以下基本查询的示例代码。按照获取 ByteHouse 连接信息中信息替换下面的 HOST、PORT 和 API_KEY 字段。

from clickhouse_driver import Client
from datetime import datetime

# Connect to ByteHouse
client = Client(
    host="{HOST}",
    port=19000,
    user="bytehouse",
    password="{API_KEY}",
    database="{DATABASE}",
    secure=True,
)

# Create Table
create_table_query = """
CREATE TABLE IF NOT EXISTS example_table (
    id Int32,
    name String,
    created_at DateTime
)
ENGINE = CnchMergeTree()
ORDER BY id;
"""
client.execute(create_table_query)

# Insert Data
data = [
    (1, 'Alice', datetime(2024, 11, 20, 10, 30, 0)),
    (2, 'Bob', datetime(2024, 11, 20, 11, 0, 0)),
    (3, 'Charlie', datetime(2024, 11, 20, 12, 15, 0)),
]
client.execute("INSERT INTO example_table (id, name, created_at) VALUES", data)

# Query Data
rows = client.execute("SELECT * FROM example_table")
for row in rows:
    print(row)

指定服务器设置

如果ByteHouse服务器支持某设置(例如virtual_warehouse),而开源驱动包中没有该设置,可以通过参考下面的示例增加。

from clickhouse_driver import Client
from clickhouse_driver.settings.available import settings as available_settings, SettingString

available_settings['virtual_warehouse'] = SettingString

client = Client(
    host="{HOST}",
    port=19000,
    user="bytehouse",
    password="{API_KEY}",
    database="{DATABASE}",
    secure=True,
    settings={"virtual_warehouse": "vw-123456789-vw-name"},
)

指定查询 ID

from clickhouse_driver import Client

# client = Client(...) # Initialize client

query_id = "bbd7dea3-eb63-4a21-b727-f55b420a7223"
client.execute("SELECT 1", query_id=query_id)

从 Pandas dataframe 插入数据

'use_numpy': True 设置需要 Numpy / Pandas 包的支持,可参考下面的示例。

from clickhouse_driver import Client
import pandas as pd

client = Client(
    host="{HOST}",
    port=19000,
    user="bytehouse",
    password="{API_KEY}",
    database="{DATABASE}",
    secure=True,
    settings={"use_numpy": True},
)

client.execute(
   'CREATE TABLE test_df (x Int, y String) Engine = CnchMergeTree() ORDER BY tuple()'
)

df_insert = pd.DataFrame({
    'x': [1, 2],
    'y': ['apple', 'banana'],
}, dtype=object)

client.insert_dataframe('INSERT INTO test_df VALUES', df_insert)

df_select = client.query_dataframe('SELECT * FROM test_df')

print(df_select)

#    x       y
# 0  1   apple
# 1  2  banana

从 CSV 文件插入数据

假设您的 CSV 文件中有以下数据:

time,order,qty
2019-08-01 15:23:14,New order1,5
2019-08-05 09:14:45,New order2,3
2019-08-13 12:20:32,New order3,7

可以通过以下方式将数据插入ByteHouse:

from csv import DictReader
from datetime import datetime

from clickhouse_driver import Client

# client = Client(...) # Initialize client

def iter_csv(filename):
    converters = {
        'time': lambda x: datetime.strptime(x, '%Y-%m-%d %H:%M:%S'),
        'order': str,
        'qty': int,
    }
    with open(filename, 'r') as f:
        reader = DictReader(f)
        for line in reader:
            yield {k: (converters[k](v) if k in converters else v) for k, v in line.items()}


client.execute(
    'CREATE TABLE IF NOT EXISTS data_csv (time DateTime, order String, qty Int32) Engine = CnchMergeTree() ORDER BY tuple()'
)
client.execute('INSERT INTO data_csv VALUES', iter_csv('data.csv'))

根据列名检索数据

通过设置“with_column_types”= True ,可以使用按列名检索数据。

from clickhouse_driver import Client

# client = Client(...) # Initialize client

client.execute("""
CREATE TABLE IF NOT EXISTS test_table (
    int_column Int32,
    string_column String,
    array_column Array(Int32)
) ENGINE = CnchMergeTree()
ORDER BY int_column
""")

data_to_insert = [
    (1, 'First', [1, 2, 3]),
    (2, 'Second', [4, 5, 6]),
    (3, 'Third', [7, 8, 9])
]

client.execute(
    "INSERT INTO test_table (int_column, string_column, array_column) VALUES",
    data_to_insert
)

query = "SELECT * FROM test_table"
data, columns = client.execute(query, with_column_types=True)

# Use dictionary comprehension to create a dictionary with column names as keys
column_data = {col_name: [row[i] for row in data] for i, col_name in enumerate([col[0] for col in columns])}

# Access by column names
print("int_column:", column_data['int_column'])
print("string_column:", column_data['string_column'])
print("array_column:", column_data['array_column'])