本文介绍如何使用 ClickHouse SqlAlchemy Connector 连接并访问 ByteHouse 云数仓。
细分项 | 已验证版本/注意事项 |
---|---|
SqlAlchemy 版本 | |
ClickHouse Connector 版本 | 注意 当前仅支持通过 |
ByteHouse clickhouse-connect 支持包版本 | 说明 由于ByteHouse无法原生支持开源的 |
Python 版本 | Python3.10 |
通过 pip 安装 ByteHouse clickhouse-connect支持包。
pip install https://github.com/bytehouse-docs/ecosystem/raw/main/dist/clickhouse-sqlalchemy/clickhouse-sqlalchemy-0.3.2+bytehouse.tar.gz
可参考下面代码样例连接ByteHouse。
注意
当前仅支持通过clickhouse-driver
使用TCP协议的原生协议进行连接。
from sqlalchemy import create_engine, text from clickhouse_sqlalchemy import make_session host = 'tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com' port = 19000 # TCP driver port database = 'YOUR_DATABASE' user = 'bytehouse' # user is always bytehouse password = 'YOUR_API_KEY' secure = True verify = False # Construct connection string connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string) session = make_session(engine) try: result = session.execute(text('SELECT 1')) value = result.scalar() print(f"Result: {value}") finally: session.close()
参数 | 配置要点 |
---|---|
host | 配置为ByteHouse的公网连接域名,其中{TENANT_ID}、{REGION}分别为火山引擎主账号的账号ID和ByteHouse的地域信息,您可以在ByteHouse控制台的 租户管理>基本信息>网络信息 中查看对应信息。 |
port | 配置为TCP连接端口:19000 |
database | 配置为连接ByteHouse的数据库名称。 |
user & password |
|
secure & verify |
|
连接ByteHouse后,您可以在SQLAlchemy中定义数据模型、使用不同数据类型、进行建表、数据读写等操作,以下为简单的操作示例。
注意
clickhouse_sqlalchemy.make_session
方法来创建这个会话对象,如果通过其他方式(例如直接使用SQLAlchemy的标准API)创建会话,则该会话对象将缺乏对ClickHouse特有SQL扩展的支持。Table
类和get_declarative_base
函数也类似,需使用clickhouse_sqlalchemy
提供的相应方法来创建,而不是直接使用SQLAlchemy的标准实现。from sqlalchemy import Column, MetaData, create_engine from clickhouse_sqlalchemy import get_declarative_base, types, engines, make_session # Create connection connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string) # Create metadata and base metadata = MetaData() Base = get_declarative_base(metadata=metadata) # Define model class PageViews(Base): __tablename__ = 'page_views' id = Column(types.UInt32, primary_key=True) page_url = Column(types.String) timestamp = Column(types.DateTime) user_id = Column(types.UInt32) __table_args__ = ( engines.CnchMergeTree( order_by=('id', 'timestamp'), primary_key=('id',) ), ) # Create table Base.metadata.create_all(engine)
from sqlalchemy import Column, MetaData, create_engine from clickhouse_sqlalchemy import get_declarative_base, types, engines, make_session from datetime import datetime # Create connection connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string) # Create metadata and base metadata = MetaData() Base = get_declarative_base(metadata=metadata) # Define model with various column types class SampleData(Base): __tablename__ = 'sample_data' # Primary key id = Column(types.UInt32, primary_key=True) # Numeric types int_value = Column(types.Int32) uint_value = Column(types.UInt64) float_value = Column(types.Float64) decimal_value = Column(types.Decimal(10, 2)) # precision 10, scale 2 # String types text_value = Column(types.String) fixed_text = Column(types.String(20)) # FixedString(20) # Date and Time date_value = Column(types.Date) datetime_value = Column(types.DateTime) datetime64_value = Column(types.DateTime64(3)) # precision 3 # Array types int_array = Column(types.Array(types.Int32)) string_array = Column(types.Array(types.String)) # Nullable types nullable_int = Column(types.Nullable(types.Int32)) nullable_string = Column(types.Nullable(types.String)) __table_args__ = ( engines.CnchMergeTree( order_by=('id', 'datetime_value'), primary_key=('id',) ), ) # Create table Base.metadata.create_all(engine) # Create session and insert sample data session = make_session(engine) try: # Create sample record sample = SampleData( id=1, int_value=-100, uint_value=100, float_value=3.14, decimal_value=123.45, text_value='Hello ClickHouse', fixed_text='Fixed Length Text', date_value=datetime.now().date(), datetime_value=datetime.now(), datetime64_value=datetime.now(), int_array=[1, 2, 3, 4, 5], string_array=['a', 'b', 'c'], nullable_int=None, nullable_string='Not null value' ) # Insert the record session.add(sample) session.commit() # Query the data result = session.query(SampleData).filter(SampleData.id == 1).first() if result: print("Retrieved data:") print(f"ID: {result.id}") print(f"Int Value: {result.int_value}") print(f"Text Value: {result.text_value}") print(f"DateTime: {result.datetime_value}") print(f"Int Array: {result.int_array}") print(f"Nullable Int: {result.nullable_int}") except Exception as e: print(f"An error occurred: {e}") session.rollback() finally: session.close()
from sqlalchemy import Column, MetaData, create_engine, func, text from clickhouse_sqlalchemy import get_declarative_base, types, engines, make_session from datetime import datetime, timedelta # Create connection connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string) # Create metadata and base metadata = MetaData() Base = get_declarative_base(metadata=metadata) # Define model class PageViewsExample(Base): __tablename__ = 'page_views_example' id = Column(types.UInt32, primary_key=True) page_url = Column(types.String) timestamp = Column(types.DateTime) user_id = Column(types.UInt32) visit_duration = Column(types.UInt32) browser = Column(types.String) country = Column(types.String) __table_args__ = ( engines.CnchMergeTree( order_by=('id', 'timestamp'), primary_key=('id',) ), ) # Create tables Base.metadata.create_all(engine) # Create session session = make_session(engine) try: # Insert some sample data sample_data = [ PageViewsExample( id=i, page_url=f'/page{i}', timestamp=datetime.now() - timedelta(hours=i), user_id=i % 3 + 1, visit_duration=i * 60, browser=f'Browser{i % 2 + 1}', country=f'Country{i % 3 + 1}' ) for i in range(1, 11) ] session.bulk_save_objects(sample_data) session.commit() # Different query examples print("\n1. Select all records") all_records = session.query(PageViewsExample).all() for record in all_records: print(f"ID: {record.id}, URL: {record.page_url}, Time: {record.timestamp}") print("\n2. Filter by last 24 hours") yesterday = datetime.now() - timedelta(days=1) recent_records = session.query(PageViewsExample)\ .filter(PageViewsExample.timestamp > yesterday)\ .all() print(f"Found {len(recent_records)} records in last 24 hours") print("\n3. Latest 5 page views") latest_records = session.query(PageViewsExample)\ .order_by(PageViewsExample.timestamp.desc())\ .limit(5)\ .all() for record in latest_records: print(f"Time: {record.timestamp}, URL: {record.page_url}") print("\n4. Complex filtering") filtered_records = session.query(PageViewsExample)\ .filter( PageViewsExample.visit_duration > 120, PageViewsExample.browser == 'Browser1' )\ .order_by(PageViewsExample.visit_duration.desc())\ .all() print("\n5. Aggregation example") stats = session.query( PageViewsExample.browser, func.count().label('visit_count'), func.avg(PageViewsExample.visit_duration).label('avg_duration') )\ .group_by(PageViewsExample.browser)\ .all() for stat in stats: print(f"Browser: {stat.browser}, Visits: {stat.visit_count}, Avg Duration: {stat.avg_duration:.2f}") print("\n6. Multiple conditions") complex_query = session.query(PageViewsExample)\ .filter( PageViewsExample.timestamp.between( datetime.now() - timedelta(days=7), datetime.now() ) )\ .filter(PageViewsExample.country.in_(['Country1', 'Country2']))\ .order_by(PageViewsExample.timestamp.desc())\ .all() print("\n7. Subquery example") subquery = session.query( PageViewsExample.user_id, func.count().label('visit_count') )\ .group_by(PageViewsExample.user_id)\ .subquery() active_users = session.query(PageViewsExample)\ .join( subquery, PageViewsExample.user_id == subquery.c.user_id )\ .filter(subquery.c.visit_count > 2)\ .distinct()\ .all() print("\n8. Raw SQL example") raw_sql = text(""" SELECT user_id, COUNT(*) as visit_count, AVG(visit_duration) as avg_duration FROM page_views_example GROUP BY user_id HAVING COUNT(*) > 2 """) raw_results = session.execute(raw_sql) for row in raw_results: print(f"User: {row.user_id}, Visits: {row.visit_count}, Avg Duration: {row.avg_duration:.2f}") except Exception as e: print(f"An error occurred: {e}") session.rollback() finally: session.close()
from sqlalchemy import Column, MetaData, create_engine, select, text from clickhouse_sqlalchemy import get_declarative_base, types, engines, make_session from datetime import datetime, timedelta # Create connection connection_string = f'clickhouse+native://{user}:{password}@{host}:{port}/{database}?secure={secure}&verify={verify}' engine = create_engine(connection_string) # Create metadata and base metadata = MetaData() Base = get_declarative_base(metadata=metadata) # Define models class PageViewsInsert(Base): __tablename__ = 'page_views_insert' id = Column(types.UInt32, primary_key=True) page_url = Column(types.String) timestamp = Column(types.DateTime) user_id = Column(types.UInt32) visit_duration = Column(types.UInt32) __table_args__ = ( engines.CnchMergeTree( order_by=('id', 'timestamp'), primary_key=('id',) ), ) class SourceData(Base): __tablename__ = 'source_data' id = Column(types.UInt32, primary_key=True) url = Column(types.String) time = Column(types.DateTime) user = Column(types.UInt32) duration = Column(types.UInt32) __table_args__ = ( engines.CnchMergeTree( order_by=('id', 'time'), primary_key=('id',) ), ) # Create tables Base.metadata.create_all(engine) # Create session session = make_session(engine) try: # 1. Single Insert print("\n1. Performing single insert...") new_view = PageViewsInsert( id=1, page_url='/home', timestamp=datetime.now(), user_id=123, visit_duration=300 ) session.add(new_view) session.commit() print("Single record inserted successfully") # 2. Batch Insert using session.execute print("\n2. Performing batch insert using execute...") batch_data = [ { 'id': i, 'page_url': f'/page{i}', 'timestamp': datetime.now() - timedelta(hours=i), 'user_id': i + 100, 'visit_duration': i * 60 } for i in range(2, 5) ] session.execute(PageViewsInsert.__table__.insert(), batch_data) session.commit() print("Batch insert using execute completed") # 3. Batch Insert using bulk_save_objects print("\n3. Performing batch insert using bulk_save_objects...") bulk_objects = [ PageViewsInsert( id=i, page_url=f'/bulk{i}', timestamp=datetime.now() - timedelta(hours=i), user_id=i + 200, visit_duration=i * 30 ) for i in range(5, 8) ] session.bulk_save_objects(bulk_objects) session.commit() print("Batch insert using bulk_save_objects completed") # 4. Insert sample data into source table print("\n4. Inserting sample data into source table...") source_data = [ SourceData( id=i, url=f'/source{i}', time=datetime.now() - timedelta(days=i), user=i + 300, duration=i * 45 ) for i in range(1, 4) ] session.bulk_save_objects(source_data) session.commit() print("Source data inserted successfully") # 5. Insert from select print("\n5. Performing insert from select...") insert_stmt = PageViewsInsert.__table__.insert().from_select( ['id', 'page_url', 'timestamp', 'user_id', 'visit_duration'], select( SourceData.id + 1000, SourceData.url, SourceData.time, SourceData.user, SourceData.duration ) ) session.execute(insert_stmt) session.commit() print("Insert from select completed") # Verify the data print("\n6. Verifying inserted data...") all_records = session.query(PageViewsInsert).order_by(PageViewsInsert.id).all() print(f"\nTotal records in PageViews: {len(all_records)}") print("\nSample of records:") for record in all_records[:5]: print(f"ID: {record.id}, URL: {record.page_url}, " f"User: {record.user_id}, Time: {record.timestamp}") except Exception as e: print(f"\nAn error occurred: {e}") session.rollback() finally: session.close()