You need to enable JavaScript to run this app.
导航
ClickHouse Go Driver
最近更新时间:2025.03.28 15:56:33首次发布时间:2023.12.18 12:14:26
我的收藏
有用
有用
无用
无用

您可以通过开源的 ClickHouse Go 驱动程序连接到 ByteHouse 云数仓版,连接后进行数据写入查询等操作。本文为您介绍ClickHouse Go驱动连接ByteHouse 云数仓版的主要操作流程和操作要点。

背景信息

由于ByteHouse的Go驱动当前在维护升级中,您可以使用开源ClickHouse的Go驱动连接ByteHouse,开源ClickHouse支持的API接口和协议如下。

细分项

能力说明

API接口

支持:

  • clickhouse client接口(推荐):提供了最佳的性能,并支持一些ClickHouse的特殊能力,例如列写入。
  • database/sql接口:如果您需要连接多个数据库、或者希望使用基于标准database/sql接口的工具和库,您可以使用本接口。

协议

支持:

  • TCP/Native协议(推荐):建议使用TCP/Native协议以获得最佳性能。
  • HTTP协议:仅database/sql接口支持,如果您需要兼容防火墙和代理,您可以使用HTTP协议。

准备工作

下载驱动

驱动

已验证版本/注意事项

ClickHouse Go基础驱动

2.30.0,驱动下载链接

Go

Golang 1.21

安装驱动

您可以通过go get安装。

go get github.com/ClickHouse/clickhouse-go/v2@v2.30.0

连接ByteHouse

连接信息

请参考获取 ByteHouse 连接信息,了解如何通过API Key或用户名+密码的方式连接到ByteHouse。

使用TCP/Native协议

  • 通过传递参数创建连接

    package main
    
    import (
            "crypto/tls"
            "fmt"
            "github.com/ClickHouse/clickhouse-go/v2"
    )
    
    func main() {
        db := clickhouse.OpenDB(&clickhouse.Options{
                Addr: []string{fmt.Sprintf("%s:%d", "tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com", 19000)},
                Auth: clickhouse.Auth{
                        Database: "{DATABASE}",
                        Username: "bytehouse",
                        Password: "{API_KEY}",
                },
                TLS: &tls.Config{
                        InsecureSkipVerify: true,
                },
        })
    
        if err := db.Ping(); err != nil {
                panic(err)
        }
    }
    
  • 通过DSN(Data Source Name,数据源名称)创建连接

    package main
    
    import (
            "database/sql"
            _ "github.com/ClickHouse/clickhouse-go/v2"
    )
    
    func main() {
        db, err := sql.Open("clickhouse", "clickhouse://bytehouse:{API_KEY}@tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:19000/{DATABASE}?secure=true")
        if err != nil {
                panic(err)
        }
    
        if err := db.Ping(); err != nil {
                panic(err)
        }
    }
    

使用HTTP协议

  • 通过传递参数创建连接

    package main
    
    import (
            "crypto/tls"
            "github.com/ClickHouse/clickhouse-go/v2"
            _ "github.com/ClickHouse/clickhouse-go/v2"
    )
    
    func main() {
        conn := clickhouse.OpenDB(&clickhouse.Options{
                Addr: []string{"tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:8123"},
                Auth: clickhouse.Auth{
                        Database: "{DATABASE}",
                        Username: "bytehouse",
                        Password: "{API_KEY}",
                },
                Protocol: clickhouse.HTTP,
                TLS: &tls.Config{
                        InsecureSkipVerify: false,
                },
                Settings: clickhouse.Settings{
                        "describe_query_with_data_type_flags": "0",
                },
        })
    
        if err := conn.Ping(); err != nil {
                panic(err)
        }
    }
    
  • 通过DSN(Data Source Name,数据源名称)创建连接

    package main
    
    import (
            "database/sql"
            _ "github.com/ClickHouse/clickhouse-go/v2"
    )
    
    func main() {
        db, err := sql.Open("clickhouse", "https://bytehouse:{API_KEY}@tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com:8123/{DATABASE}?secure=true&describe_query_with_data_type_flags=0")
        if err != nil {
                panic(err)
        }
    
        if err := db.Ping(); err != nil {
                panic(err)
        }
    }
    

(可选)设置Server参数

您可以在自定义参数中设置更多Server参数,例如设置计算组资源。

db := clickhouse.OpenDB(&clickhouse.Options{
    Addr: []string{fmt.Sprintf("%s:%d", "tenant-{TENANT_ID}-{REGION}-public.bytehouse.volces.com", 19000)},
    Auth: clickhouse.Auth{
        Database: "{DATABASE}",
        Username: "bytehouse",
        Password: "{API_KEY}",
    },
    TLS: &tls.Config{
        InsecureSkipVerify: true,
    },
    Settings: clickhouse.Settings{
        "virtual_warehouse": "{VW_ID}",
    },
})

结果验证

通用操作

可以使用Exec方法执行任意操作。该方法不支持接收上下文,默认情况下它与后台上下文一起执行。如果需要,用户可以使用ExecContext

db.Exec("DROP TABLE IF EXISTS example")
_, err = db.Exec(
    "CREATE TABLE IF NOT EXISTS example (Col1 UInt8, Col2 String) engine = CnchMergeTree order by tuple()",
)
if err != nil {
    return err
}
db.Exec("INSERT INTO example VALUES (1, 'test-1')")
db.Exec("DROP TABLE IF EXISTS example")

批量写入数据

说明

当前ByteHouse不支持Asynchronous Insert(异步插入)。

  1. 使用db. Begin()启动事务
  2. 准备一个批处理语句使用tx.Prepare("INSERT INTO...")
  3. 使用batch. Exec()为每一行执行批量插入
  4. 使用tx. Commit()提交事务以发送所有行
func BatchInsert() error {
    db, err := GetSqlDb()
    if err != nil {
            return err
    }

    defer func() {
            db.Exec("DROP TABLE example")
    }()

    db.Exec("DROP TABLE IF EXISTS example")
    _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS example (
        Col1 UInt8
    , Col2 String
    , Col3 FixedString(3)
    , Col4 UUID
    , Col5 Map(String, UInt8)
    , Col6 Array(String)
    , Col7 Tuple(String, UInt8, Array(Map(String, String)))
    , Col8 DateTime
) Engine = CnchMergeTree() ORDER BY tuple()
`)
    if err != nil {
            return err
    }

    scope, err := db.Begin()
    if err != nil {
            return err
    }
    batch, err := scope.Prepare("INSERT INTO example")
    if err != nil {
            return err
    }
    for i := 0; i < 1000; i++ {
            _, err := batch.Exec(
                    uint8(42),
                    "ClickHouse", "Inc",
                    uuid.New(),
                    map[string]uint8{"key": 1},             // Map(String, UInt8)
                    []string{"Q", "W", "E", "R", "T", "Y"}, // Array(String)
                    []any{ // Tuple(String, UInt8, Array(Map(String, String)))
                            "String Value", uint8(5), []map[string]string{
                                    {"key": "value"},
                                    {"key": "value"},
                                    {"key": "value"},
                            },
                    },
                    time.Now(),
            )
            if err != nil {
                    return err
            }
    }
    return scope.Commit()
}

查询写入结果

可以使用QueryRow/Query方法进行查询。

row := db.QueryRow("SELECT * FROM example")
var (
    col1             uint8
    col2, col3, col4 string
    col5             map[string]uint8
    col6             []string
    col7             interface{}
    col8             time.Time
)
if err := row.Scan(&col1, &col2, &col3, &col4, &col5, &col6, &col7, &col8); err != nil {
    return err
}

使用上下文

传递查询ID

func UseContextSendQueryId() error {
    db, err := GetSqlDb()
    if err != nil {
            return err
    }

    // We can use context to send query id
    var one uint8
    queryId, _ := uuid.NewUUID()
    ctx := clickhouse.Context(context.Background(), clickhouse.WithQueryID(queryId.String()))
    if err = db.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
            return err
    }

    return nil
}

传递查询设置

func UseContextSendQuerySetting() error {
    db, err := GetSqlDb()
    if err != nil {
            return err
    }

    // we can use context to pass settings to a specific API call
    ctx := clickhouse.Context(context.Background(), clickhouse.WithSettings(clickhouse.Settings{
            "max_execution_time": 450,
    }))

    var settingValue uint16
    if err := db.QueryRowContext(ctx, "SELECT getSetting('max_execution_time')").Scan(&settingValue); err != nil {
            return fmt.Errorf("failed to get setting value: %v", err)
    }
    if settingValue != 450 {
            return fmt.Errorf("expected setting value to be 450, got %d", settingValue)
    }

    return nil
}