You need to enable JavaScript to run this app.
导航
ClickHouse SqlAlchemy Connector
最近更新时间:2025.03.19 16:14:44首次发布时间:2025.03.19 16:14:44
我的收藏
有用
有用
无用
无用

本文介绍如何使用 ClickHouse SqlAlchemy Connector 连接并访问 ByteHouse 云数仓。

注意事项

细分项

已验证版本/注意事项

SqlAlchemy 版本

2.0.38

ClickHouse Connector 版本

0.3.2

注意

当前仅支持通过clickhouse-driver使用TCP协议的原生协议进行连接。

ByteHouse clickhouse-connect 支持包版本

0.3.2+bytehouse

说明

由于ByteHouse无法原生支持开源的ClickHouse SqlAlchemy Connector,因此您还需要使用ByteHouse clickhouse-connect支持包对开源连接器进行修改支持。

Python 版本

Python3.10

安装驱动

通过 pip 安装 ByteHouse clickhouse-connect支持包。

pip install https://github.com/bytehouse-docs/ecosystem/raw/main/dist/clickhouse-sqlalchemy/clickhouse-sqlalchemy-0.3.2+bytehouse.tar.gz

连接ByteHouse

可参考下面代码样例连接ByteHouse。

注意

当前仅支持通过clickhouse-driver使用TCP协议的原生协议进行连接。

from sqlalchemy import create_engine, text
from clickhouse_sqlalchemy import make_session

host = 'tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com'
port = 19000 # TCP driver port
database = 'YOUR_DATABASE'
user = 'bytehouse' # user is always bytehouse
password = 'YOUR_API_KEY'
secure = True
verify = False

# Construct connection string
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'

engine = create_engine(connection_string)
session = make_session(engine)

try:
    result = session.execute(text('SELECT 1'))
    value = result.scalar()
    print(f"Result: {value}")
finally:
    session.close()

参数

配置要点

host

配置为ByteHouse的公网连接域名,其中{TENANT_ID}、{REGION}分别为火山引擎主账号的账号ID和ByteHouse的地域信息,您可以在ByteHouse控制台的 租户管理>基本信息>网络信息 中查看对应信息。

port

配置为TCP连接端口:19000

database

配置为连接ByteHouse的数据库名称。

user & password

  • user:固定配置为bytehouse
  • password:为ByteHouse的<API_Key>,您可以在ByteHouse控制台的 租户管理>连接信息 中获取API Key。

secure & verify

  • secure:配置为true
  • verify:配置为false

使用示例

连接ByteHouse后,您可以在SQLAlchemy中定义数据模型、使用不同数据类型、进行建表、数据读写等操作,以下为简单的操作示例。

示例1:定义数据模型

注意

  • 创建会话(session)时,必须使用clickhouse_sqlalchemy.make_session方法来创建这个会话对象,如果通过其他方式(例如直接使用SQLAlchemy的标准API)创建会话,则该会话对象将缺乏对ClickHouse特有SQL扩展的支持。
  • Table类和get_declarative_base函数也类似,需使用clickhouse_sqlalchemy提供的相应方法来创建,而不是直接使用SQLAlchemy的标准实现。
from sqlalchemy import Column, MetaData, create_engine
from clickhouse_sqlalchemy import get_declarative_base, types, engines, make_session

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

# Create metadata and base
metadata = MetaData()
Base = get_declarative_base(metadata=metadata)

# Define model
class PageViews(Base):
    __tablename__ = 'page_views'

    id = Column(types.UInt32, primary_key=True)
    page_url = Column(types.String)
    timestamp = Column(types.DateTime)
    user_id = Column(types.UInt32)

    __table_args__ = (
        engines.CnchMergeTree(
            order_by=('id', 'timestamp'),
            primary_key=('id',)
        ),
    )

# Create table
Base.metadata.create_all(engine)

示例2:使用不同的数据类型

from sqlalchemy import Column, MetaData, create_engine
from clickhouse_sqlalchemy import get_declarative_base, types, engines, make_session
from datetime import datetime

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

# Create metadata and base
metadata = MetaData()
Base = get_declarative_base(metadata=metadata)

# Define model with various column types
class SampleData(Base):
    __tablename__ = 'sample_data'

    # Primary key
    id = Column(types.UInt32, primary_key=True)
    
    # Numeric types
    int_value = Column(types.Int32)
    uint_value = Column(types.UInt64)
    float_value = Column(types.Float64)
    decimal_value = Column(types.Decimal(10, 2))  # precision 10, scale 2
    
    # String types
    text_value = Column(types.String)
    fixed_text = Column(types.String(20))  # FixedString(20)
    
    # Date and Time
    date_value = Column(types.Date)
    datetime_value = Column(types.DateTime)
    datetime64_value = Column(types.DateTime64(3))  # precision 3
    
    # Array types
    int_array = Column(types.Array(types.Int32))
    string_array = Column(types.Array(types.String))
    
    # Nullable types
    nullable_int = Column(types.Nullable(types.Int32))
    nullable_string = Column(types.Nullable(types.String))

    __table_args__ = (
        engines.CnchMergeTree(
            order_by=('id', 'datetime_value'),
            primary_key=('id',)
        ),
    )

# Create table
Base.metadata.create_all(engine)

# Create session and insert sample data
session = make_session(engine)

try:
    # Create sample record
    sample = SampleData(
        id=1,
        int_value=-100,
        uint_value=100,
        float_value=3.14,
        decimal_value=123.45,
        text_value='Hello ClickHouse',
        fixed_text='Fixed Length Text',
        date_value=datetime.now().date(),
        datetime_value=datetime.now(),
        datetime64_value=datetime.now(),
        int_array=[1, 2, 3, 4, 5],
        string_array=['a', 'b', 'c'],
        nullable_int=None,
        nullable_string='Not null value'
    )
    
    # Insert the record
    session.add(sample)
    session.commit()
    
    # Query the data
    result = session.query(SampleData).filter(SampleData.id == 1).first()
    if result:
        print("Retrieved data:")
        print(f"ID: {result.id}")
        print(f"Int Value: {result.int_value}")
        print(f"Text Value: {result.text_value}")
        print(f"DateTime: {result.datetime_value}")
        print(f"Int Array: {result.int_array}")
        print(f"Nullable Int: {result.nullable_int}")

except Exception as e:
    print(f"An error occurred: {e}")
    session.rollback()
finally:
    session.close()

示例3:读取数据

from sqlalchemy import Column, MetaData, create_engine, func, text
from clickhouse_sqlalchemy import get_declarative_base, types, engines, make_session
from datetime import datetime, timedelta

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

# Create metadata and base
metadata = MetaData()
Base = get_declarative_base(metadata=metadata)

# Define model
class PageViewsExample(Base):
    __tablename__ = 'page_views_example'

    id = Column(types.UInt32, primary_key=True)
    page_url = Column(types.String)
    timestamp = Column(types.DateTime)
    user_id = Column(types.UInt32)
    visit_duration = Column(types.UInt32)
    browser = Column(types.String)
    country = Column(types.String)

    __table_args__ = (
        engines.CnchMergeTree(
            order_by=('id', 'timestamp'),
            primary_key=('id',)
        ),
    )

# Create tables
Base.metadata.create_all(engine)

# Create session
session = make_session(engine)

try:
    # Insert some sample data
    sample_data = [
        PageViewsExample(
            id=i,
            page_url=f'/page{i}',
            timestamp=datetime.now() - timedelta(hours=i),
            user_id=i % 3 + 1,
            visit_duration=i * 60,
            browser=f'Browser{i % 2 + 1}',
            country=f'Country{i % 3 + 1}'
        ) for i in range(1, 11)
    ]
    session.bulk_save_objects(sample_data)
    session.commit()

    # Different query examples
    print("\n1. Select all records")
    all_records = session.query(PageViewsExample).all()
    for record in all_records:
        print(f"ID: {record.id}, URL: {record.page_url}, Time: {record.timestamp}")

    print("\n2. Filter by last 24 hours")
    yesterday = datetime.now() - timedelta(days=1)
    recent_records = session.query(PageViewsExample)\
        .filter(PageViewsExample.timestamp > yesterday)\
        .all()
    print(f"Found {len(recent_records)} records in last 24 hours")

    print("\n3. Latest 5 page views")
    latest_records = session.query(PageViewsExample)\
        .order_by(PageViewsExample.timestamp.desc())\
        .limit(5)\
        .all()
    for record in latest_records:
        print(f"Time: {record.timestamp}, URL: {record.page_url}")

    print("\n4. Complex filtering")
    filtered_records = session.query(PageViewsExample)\
        .filter(
        PageViewsExample.visit_duration > 120,
        PageViewsExample.browser == 'Browser1'
        )\
        .order_by(PageViewsExample.visit_duration.desc())\
        .all()

    print("\n5. Aggregation example")
    stats = session.query(
        PageViewsExample.browser,
        func.count().label('visit_count'),
        func.avg(PageViewsExample.visit_duration).label('avg_duration')
    )\
        .group_by(PageViewsExample.browser)\
        .all()
    for stat in stats:
        print(f"Browser: {stat.browser}, Visits: {stat.visit_count}, Avg Duration: {stat.avg_duration:.2f}")

    print("\n6. Multiple conditions")
    complex_query = session.query(PageViewsExample)\
        .filter(
            PageViewsExample.timestamp.between(
                datetime.now() - timedelta(days=7),
                datetime.now()
            )
        )\
        .filter(PageViewsExample.country.in_(['Country1', 'Country2']))\
        .order_by(PageViewsExample.timestamp.desc())\
        .all()

    print("\n7. Subquery example")
    subquery = session.query(
        PageViewsExample.user_id,
        func.count().label('visit_count')
    )\
        .group_by(PageViewsExample.user_id)\
        .subquery()

    active_users = session.query(PageViewsExample)\
        .join(
            subquery,
        PageViewsExample.user_id == subquery.c.user_id
        )\
        .filter(subquery.c.visit_count > 2)\
        .distinct()\
        .all()

    print("\n8. Raw SQL example")
    raw_sql = text("""
        SELECT 
            user_id,
            COUNT(*) as visit_count,
            AVG(visit_duration) as avg_duration
        FROM page_views_example
        GROUP BY user_id
        HAVING COUNT(*) > 2
    """)
    raw_results = session.execute(raw_sql)
    for row in raw_results:
        print(f"User: {row.user_id}, Visits: {row.visit_count}, Avg Duration: {row.avg_duration:.2f}")

except Exception as e:
    print(f"An error occurred: {e}")
    session.rollback()
finally:
    session.close()

示例4:写入数据

from sqlalchemy import Column, MetaData, create_engine, select, text
from clickhouse_sqlalchemy import get_declarative_base, types, engines, make_session
from datetime import datetime, timedelta

# Create connection
connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}'
engine = create_engine(connection_string)

# Create metadata and base
metadata = MetaData()
Base = get_declarative_base(metadata=metadata)

# Define models
class PageViewsInsert(Base):
    __tablename__ = 'page_views_insert'

    id = Column(types.UInt32, primary_key=True)
    page_url = Column(types.String)
    timestamp = Column(types.DateTime)
    user_id = Column(types.UInt32)
    visit_duration = Column(types.UInt32)

    __table_args__ = (
        engines.CnchMergeTree(
            order_by=('id', 'timestamp'),
            primary_key=('id',)
        ),
    )

class SourceData(Base):
    __tablename__ = 'source_data'

    id = Column(types.UInt32, primary_key=True)
    url = Column(types.String)
    time = Column(types.DateTime)
    user = Column(types.UInt32)
    duration = Column(types.UInt32)

    __table_args__ = (
        engines.CnchMergeTree(
            order_by=('id', 'time'),
            primary_key=('id',)
        ),
    )

# Create tables
Base.metadata.create_all(engine)

# Create session
session = make_session(engine)

try:
    # 1. Single Insert
    print("\n1. Performing single insert...")
    new_view = PageViewsInsert(
        id=1,
        page_url='/home',
        timestamp=datetime.now(),
        user_id=123,
        visit_duration=300
    )
    session.add(new_view)
    session.commit()
    print("Single record inserted successfully")

    # 2. Batch Insert using session.execute
    print("\n2. Performing batch insert using execute...")
    batch_data = [
        {
            'id': i,
            'page_url': f'/page{i}',
            'timestamp': datetime.now() - timedelta(hours=i),
            'user_id': i + 100,
            'visit_duration': i * 60
        }
        for i in range(2, 5)
    ]
    session.execute(PageViewsInsert.__table__.insert(), batch_data)
    session.commit()
    print("Batch insert using execute completed")

    # 3. Batch Insert using bulk_save_objects
    print("\n3. Performing batch insert using bulk_save_objects...")
    bulk_objects = [
        PageViewsInsert(
            id=i,
            page_url=f'/bulk{i}',
            timestamp=datetime.now() - timedelta(hours=i),
            user_id=i + 200,
            visit_duration=i * 30
        )
        for i in range(5, 8)
    ]
    session.bulk_save_objects(bulk_objects)
    session.commit()
    print("Batch insert using bulk_save_objects completed")

    # 4. Insert sample data into source table
    print("\n4. Inserting sample data into source table...")
    source_data = [
        SourceData(
            id=i,
            url=f'/source{i}',
            time=datetime.now() - timedelta(days=i),
            user=i + 300,
            duration=i * 45
        )
        for i in range(1, 4)
    ]
    session.bulk_save_objects(source_data)
    session.commit()
    print("Source data inserted successfully")

    # 5. Insert from select
    print("\n5. Performing insert from select...")
    insert_stmt = PageViewsInsert.__table__.insert().from_select(
        ['id', 'page_url', 'timestamp', 'user_id', 'visit_duration'],
        select(
            SourceData.id + 1000,
            SourceData.url,
            SourceData.time,
            SourceData.user,
            SourceData.duration
        )
    )
    session.execute(insert_stmt)
    session.commit()
    print("Insert from select completed")

    # Verify the data
    print("\n6. Verifying inserted data...")
    all_records = session.query(PageViewsInsert).order_by(PageViewsInsert.id).all()
    print(f"\nTotal records in PageViews: {len(all_records)}")
    print("\nSample of records:")
    for record in all_records[:5]:
        print(f"ID: {record.id}, URL: {record.page_url}, "
              f"User: {record.user_id}, Time: {record.timestamp}")

except Exception as e:
    print(f"\nAn error occurred: {e}")
    session.rollback()
finally:
    session.close()