数据库传输服务 DTS 的数据订阅服务支持使用 Kafka 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go、Java 和 Python 语言消费 Canal 格式的数据。
已安装 protoc,建议使用 protoc 3.18 或以上版本。
说明
您可以执行 protoc -version
查看 protoc 版本。
用于订阅消费数据的客户端需要指定服务端 Kafka 版本号,版本号需为 2.2.x(例如 2.2.2)。您可以在示例代码中指定 Kafka 版本号,具体参数如下表所示。
运行语言 | 说明 |
---|---|
Go | 通过代码示例中参数 config.Version 指定服务端 Kafka 版本号。 |
Python | 通过示例代码中参数 api_version 指定服务端 Kafka 版本号。 |
Java | 通过 maven pom.xml 文件中参数 version 指定服务端 Kafka 版本号。 |
按需安装运行语言环境。
运行语言 | 说明 |
---|---|
Go | 安装 Go,需使用 Go 1.13 或以上版本。您可以执行 go version 查看 Go 的版本。 |
Python |
|
Java |
|
本文以 macOS 操作系统为例,介绍如何关联 Kafka 和订阅任务。
在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组。
按需选择 Java 消费示例或 Python 消费示例,Python 语言和 Java 语言各消费示例的目录如下所示:
. ├── dts_kafka_consumer_demo.py # 消费 Demo 文件 ├── volc.proto # 火山引擎格式文件 └── volc_pb2.py # 编译 Volc.proto 后的生成的 Python 文件
说明
Go 语言中仅包含一个 Demo 文件。
按需修改 Demo 文件,具体代码如下所示。
参数 | 说明 | 示例值 |
---|---|---|
GROUP | 消费组名称。 | 285fef6b91754d0bbaab32e4976c****:test_dtssdk |
USER | Kafka 用户名。 | test_user |
PASSWORD | Kafka 用户密码。 | Test@Pwd |
TOPIC | 目标 DTS 数据订阅通道的 Topic。 | d73e98e7fa9340faa3a0d4ccfa10**** |
BROKERS | 目标 DTS 数据订阅通道的私网地址。 | kafka-cndvhw9ves******.kafka.ivolces.com:9092 |
package main import ( "context" "fmt" "log" "strings" "sync" "github.com/Shopify/sarama" proto "github.com/volcengine/volc-sdk-golang/example/dts/data-subscription-demo/proto" protobuf "google.golang.org/protobuf/proto" ) type Handler struct { topic string partitionCount map[int32]int totalCount int mu sync.Mutex } type Config struct { username string password string topic string group string brokers string } var ( c Config ) func init() { c.brokers = "your brokers addrress" c.topic = "your topic" c.group = "your group" c.username = "your username" c.password = "your password" } func (h *Handler) Setup(session sarama.ConsumerGroupSession) error { fmt.Println("setup") return nil } func (h *Handler) Cleanup(sarama.ConsumerGroupSession) error { fmt.Println("clean up") return nil } func (h *Handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { fmt.Println("ConsumeClaim") for m := range claim.Messages() { h.handleMsg(m) session.MarkMessage(m, "") session.Commit() } return nil } func (h *Handler) handleMsg(msg *sarama.ConsumerMessage) { h.mu.Lock() defer h.mu.Unlock() h.totalCount++ h.partitionCount[msg.Partition]++ entry := &proto.Entry{} if err := protobuf.Unmarshal(msg.Value, entry); err != nil { panic(err) } fmt.Println("-------------- handle message --------------") fmt.Printf("get message EventType:%v ", entry.EntryType.String()) switch entry.GetEntryType() { case proto.EntryType_DDL: event := entry.GetDdlEvent() fmt.Printf("ddl %v ", event.Sql) case proto.EntryType_DML: event := entry.GetDmlEvent() cols := event.ColumnDefs for _, row := range event.Rows { var before, after []string for i, col := range row.BeforeCols { before = append(before, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue())) } for i, col := range row.AfterCols { after = append(after, fmt.Sprintf("%+v[%+v]", cols[i].GetName(), col.GetValue())) } fmt.Printf("get row before=%v after=%v ", before, after) } } fmt.Printf("fetch message partition=%v key=%v ", msg.Partition, string(msg.Key)) fmt.Printf("count partition-count=%v total-count=%v ", h.partitionCount, h.totalCount) } func main() { fmt.Printf("config: %+v", c) config := sarama.NewConfig() config.Net.SASL.User = c.username config.Net.SASL.Password = c.password config.Net.SASL.Enable = true config.Consumer.Offsets.Initial = sarama.OffsetNewest config.Version = sarama.V2_2_2_0 topic := c.topic group := c.group addr := strings.Split(c.brokers, ",") cons, err := sarama.NewConsumerGroup(addr, group, config) if err != nil { panic(err) } defer cons.Close() handler := &Handler{ topic: topic, partitionCount: make(map[int32]int), } for { err = cons.Consume(context.Background(), []string{handler.topic}, handler) if err != nil { log.Fatalln(err) } } }
说明
test
,表格 demo
为例。在源数据库中,执行以下命令创建一张名为 demo
的表。
CREATE TABLE demo (id_t INT);
预期输出:
src_type:MySQL entry_type:DDL timestamp:1639057424 server_id:"105198****" database:"test" table:"demo" ddl_event:{sql:"create table demo (id_t int)"}
执行以下命令,在 demo
表中插入一条数据:
INSERT INTO demo (id_t) VALUES (1);
预期输出:
src_type:MySQL entry_type:DML timestamp:1639057434 server_id:"105198****" database:"test" table:"demo" dml_event:{ type:INSERT table_id:"148" column_defs:{ index:1 type:INTEGER OriginType:"int" name:"id_t" is_nullable:true is_unsigned:false } rows:{ after_cols:{ is_null:false int64_value:1 } } }