数据库传输服务 DTS 的数据订阅服务支持使用 RocketMQ 客户端消费火山引擎 Proto 格式的订阅数据。本文以订阅云数据库 MySQL 版实例为例,介绍如何使用 Go 和 Java 语言消费 Canal 格式的数据。
运行环境 | 说明 |
---|---|
Go 语言 | 安装 Go,需使用 Go 1.13 或以上版本。 说明 您可以执行 |
Java 语言 |
|
在运行对应语言的 demo 时,需要先根据以下操作步骤完成 Protocol Buffers(也称 ProtoBuf)文件的下载及编译:
说明
本文以火山引擎定义的 ProtoBuf 为例。
下载 ProtoBuf 文件。
将 ProtoBuf
文件编译成对应语言的代码。编译方法如下所示:
Go 语言
由于数据库传输服务的开发人员已经帮助您将 ProtoBuf
文件编译成 Go 语言的代码,您无需再编译。关于编译的更多信息,请参见 Protocol Buffer Basics: Go。
Java 语言
当您的语言是 Java 时,请执行以下命令将 ProtoBuf
文件编译成 Java 语言的代码,获取 volc.Volc
文件。关于编译的更多信息,请参见 Protocol Buffer Basics: Java。
protoc -I=. --java_out=. volc.proto
本文以 macOS 操作系统为例,介绍如何关联 RocketMQ 和订阅任务。
在目标数据订阅通道中新增消费组。详细信息,请参见新建消费组。
编辑 .zshrc
文件,配置以下环境变量信息,并完成认证,即可调用 SDK 来消费消息数据。
参数 | 说明 | 示例值 |
---|---|---|
GROUP | 消费组名称。 | 285fef6b91754d0bbaab32e4976c****:test_dtssdk |
ACCESSKEY | RocketMQ 用户名。 | 7ZrfJmOpVks5wvurbUgr**** |
SECRETKEY | RocketMQ 用户密码。 | q70s5SrCgsePZPh3fb0d**** |
TOPIC | 目标 DTS 数据订阅通道的 Topic。 | d73e98e7fa9340faa3a0d4ccfa10**** |
NAMESRV_ADDR | 目标 DTS 数据订阅通道的私网地址。 | http://rocketmq-cndv83ef3******.rocketmq.ivolces.com:9876 |
说明
关于以上表格的详细信息,请参见查看 RocketMQ 实例详情。
按需选择以下 demo 示例。
package main import ( "context" "datasubscription/proto" "fmt" "os" "strings" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/sirupsen/logrus" protobuf "google.golang.org/protobuf/proto" ) type Config struct { accessKey string secretKey string topic string group string namesrvAddr string } var c Config func init() { c.namesrvAddr = os.Getenv("NAMESRV_ADDR") c.topic = os.Getenv("TOPIC") c.group = os.Getenv("GROUP") c.accessKey = os.Getenv("ACCESSKEY") c.secretKey = os.Getenv("SECRETKEY") } func main() { logrus.Infof("config: %+v", c) cli, _ := rocketmq.NewPushConsumer( consumer.WithGroupName(c.group), consumer.WithNsResolver(primitive.NewPassthroughResolver(strings.Split(c.namesrvAddr, ","))), consumer.WithConsumerModel(consumer.Clustering), consumer.WithConsumerOrder(true), consumer.WithCredentials(primitive.Credentials{ AccessKey: c.accessKey, SecretKey: c.secretKey, }), ) err := cli.Subscribe(c.topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for _, msg := range msgs { entry := &proto.Entry{} if err := protobuf.Unmarshal(msg.Body, entry); err != nil { panic(err) } logrus.WithField("EntryType", entry.EntryType.String()).Info("get message") switch entry.GetEntryType() { case proto.EntryType_DDL: event := entry.GetDdlEvent() logrus.Infof("ddl: %s", event.GetSql()) case proto.EntryType_DML: event := entry.GetDmlEvent() cols := event.ColumnDefs for _, row := range event.Rows { var before, after []string for i, col := range row.BeforeCols { before = append(before, fmt.Sprintf("%s[%v]", cols[i].GetName(), col.GetValue())) } for i, col := range row.AfterCols { after = append(after, fmt.Sprintf("%s[%v]", cols[i].GetName(), col.GetValue())) } logrus.WithField("after", after).WithField("before", before).Info("get row") } } logrus.WithField("queueId", msg.Queue.QueueId).WithField("key", msg.GetKeys()).Info("fetch message") } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println(err.Error()) } err = cli.Start() if err != nil { fmt.Println(err.Error()) os.Exit(-1) } time.Sleep(time.Hour) cli.Shutdown() }
说明
test
,表格 demo
为例。demo
的表。在CREATE TABLE demo (id_t INT);
demo
表中打印出数据消费消息的 JSON 数据,如下所示,请按需选择示例代码:INFO[0010] get message EntryType=DDL INFO[0010] ddl: CREATE TABLE demo (id_t INT) INFO[0010] fetch message key="dflow.demo " queueId=8
demo
表中插入一条数据:在INSERT INTO demo (id_t) VALUES (1);
demo
表中打印出数据消费消息的 JSON 数据,如下所示,请按需选择示例代码:INFO[0073] get message EntryType=DML INFO[0073] get row after="[id_t[&{1}]]" before="[]" INFO[0073] fetch message key="dflow.demo " queueId=8