数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费 Canal Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal Proto 格式的数据。
用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。
运行语言 | 说明 |
---|---|
Go | 通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。 |
Python | 通过示例代码中参数 api_version 指定服务端 Kafka 版本号。 |
Java | 通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。 |
按需安装运行语言环境。
运行语言 | 说明 |
---|---|
Go | 安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。 |
Python |
|
Java |
在运行对应语言的 demo 时,需要先根据以下操作步骤完成 Protocol Buffers(也称 ProtoBuf)文件的下载及编译。
说明
本文以火山引擎定义的 ProtoBuf 为例。
下载 ProtoBuf 文件。
将下载的 canal.proto
文件编译成对应语言的代码。编译方法如下所示:
ProtoBuf
文件编译成 Go 语言的代码,您无需再编译。关于编译的更多信息,请参见 Protocol Buffer Basics: Go。ProtoBuf
文件编译成 Python 语言的代码,获取 canal_pb2
文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Python。protoc -I=. --python_out=. canal.proto
ProtoBuf
文件编译成 Java 语言的代码,获取 canal.Canal
文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Java。protoc -I=. --java_out=. canal.proto
本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。
在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组。
编辑 .zshrc
文件,配置以下环境变量信息,并完成认证,即可调用 SDK 来消费消息数据。
参数 | 说明 | 示例值 |
---|---|---|
GROUP | 消费组名称。 | 285fef6b91754d0bbaab32e4976c****:test_dtssdk |
USER | Kafka 用户名。 | test_user |
PASSWORD | Kafka 用户密码。 | Test@Pwd |
TOPIC | 目标 DTS 数据订阅通道的 Topic。 | d73e98e7fa9340faa3a0d4ccfa10**** |
BROKERS | 目标 DTS 数据订阅通道的私网地址。 | kafka-cndvhw9ves******.kafka.ivolces.com:9092 |
请按需选择以下 demo 示例。
package main import ( "context" "fmt" "log" "os" "strings" "sync" "github.com/Shopify/sarama" "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/canal" protobuf "google.golang.org/protobuf/proto" ) type Handler struct { topic string partitionCount map[int32]int totalCount int mu sync.Mutex } type Config struct { username string password string topic string group string addr string } var ( c Config ) func init() { c.addr = os.Getenv("BROKERS") c.topic = os.Getenv("TOPIC") c.group = os.Getenv("GROUP") c.username = os.Getenv("USER") c.password = os.Getenv("PASSWORD") } func (h *Handler) Setup(session sarama.ConsumerGroupSession) error { fmt.Println("setup") return nil } func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error { fmt.Println("clean up") return nil } func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { fmt.Println("ConsumeClaim") for m := range claim.Messages() { h.handleCanalMsg(m) session.MarkMessage(m, "") session.Commit() } return nil } func (h *Handler) handleCanalMsg(msg *sarama.ConsumerMessage) { h.mu.Lock() defer h.mu.Unlock() h.totalCount++ h.partitionCount[msg.Partition]++ entry := &canal.Entry{} if err := protobuf.Unmarshal(msg.Value, entry); err != nil { panic(err) } fmt.Println("-------------- handle message --------------") fmt.Printf("ServerID:%v\n", entry.GetHeader().GetServerId()) fmt.Printf("Timestamp:%v\n", entry.GetHeader().GetExecuteTime()) fmt.Printf("Database:%v\n", entry.GetHeader().GetSchemaName()) fmt.Printf("Table:%v\n", entry.GetHeader().GetTableName()) if entry.GetEntryType() != canal.EntryType_TRANSACTIONBEGIN && entry.GetEntryType() != canal.EntryType_TRANSACTIONEND { rowChange := &canal.RowChange{} if err := protobuf.Unmarshal(entry.GetStoreValue(), rowChange); err != nil { panic(err) } fmt.Printf("EventType:%v\n", rowChange.GetEventType().String()) if rowChange.GetIsDdl() { fmt.Printf("DDL:%s\n", rowChange.GetSql()) } else { for _, row := range rowChange.GetRowDatas() { var before, after []string for _, col := range row.BeforeColumns { before = append(before, fmt.Sprintf("%s[%s(%s)]", col.GetName(), col.GetValue(), col.GetMysqlType())) } for _, col := range row.AfterColumns { after = append(after, fmt.Sprintf("%s[%s(%s)]", col.GetName(), col.GetValue(), col.GetMysqlType())) } fmt.Printf("RowDatas:before=%v after=%v\n", before, after) } } } } func main() { fmt.Printf("config: %+v", c) config := sarama.NewConfig() config.Net.SASL.User = c.username config.Net.SASL.Password = c.password config.Net.SASL.Enable = true config.Consumer.Offsets.Initial = sarama.OffsetNewest config.Version = sarama.V2_2_2_0 topic := c.topic group := c.group addr := strings.Split(c.addr, ",") cons, err := sarama.NewConsumerGroup(addr, group, config) if err != nil { panic(err) } defer cons.Close() handler := &Handler{ topic: topic, partitionCount: make(map[int32]int), } for { err = cons.Consume(context.Background(), []string{handler.topic}, handler) if err != nil { log.Fatalln(err) } } }
说明
test
,表格 demo
为例。在源数据库中,执行以下命令创建一张名为 demo
的表。
CREATE TABLE demo (id_t INT);
预期输出如下所示:
sourceType:MYSQL entryType:ROWDATA executeTime:1692793679 schemaName:test tableName:demo EventType:CREATE DDL:CREATE TABLE demo (id_t INT)
执行以下命令,在 demo
表中插入一条数据:
INSERT INTO demo (id_t) VALUES (1);
预期输出:
sourceType:MYSQL entryType:ROWDATA executeTime:1692793748 schemaName:test tableName:demo EventType:INSERT RowDatas:before=[] after=[id_t[1(int)]]