火山引擎DataSail数据订阅SDK提供了客户消费DataSail数据集的能力,用户无需关心消费的细节,只需实现自己的消息处理逻辑。
请确保您已开通了您需要访问的服务。您可前往火山引擎控制台开通全域数据集成服务,详见服务开通。
Access Key(访问密钥)是访问火山引擎服务的安全凭证,包含Access Key ID(简称为AK)和Secret Access Key(简称为SK)两部分。您可登录火山引擎控制台 ,
前往访问控制 的访问密钥 中创建及管理您的Access Key。更多信息可参考访问密钥帮助文档 。
在Datasail确认已有要消费的数据集
申请消费组
Go版本需要不低于1.16。
Java版本需要不低于1.8。
Java版本:
参数名 | 类型 | 配置项含义 | 示例 | |
---|---|---|---|---|
Config | dataset | string | DataSail 数据集 | byteio_dataset_test |
accessKey | string | 火山云 Access Key | AKLTZW***** | |
secretKey | string | 火山云 Secret Key | TW1KaVlURmlaR0******* | |
subscribeCenter | string | 数据订阅配置中心域名 | https://datasail01-cn-beijing.volceapplog.com | |
network | 枚举 | 消费网络类型(公网消费or内网消费) | NETWORK_EXTERNAL | |
Consumer | consumerGroup | string | 消费者组 | test |
autoOffsetReset | 枚举 | 初始无offset时的消费策略 | earliest |
将jar包导入maven本地仓库
mvn install:install-file -Dfile=datasail-subscriber-java-cloud-1.0-SNAPSHOT.jar -DgroupId=com.volcengine.datasail -DartifactId=datasail-subscriber-java-cloud -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar
添加以下pom依赖
<dependency> <groupId>com.volcengine.datasail</groupId> <artifactId>datasail-subscriber-java-cloud</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.8</version> </dependency> <dependency> <groupId>com.volcengine</groupId> <artifactId>volc-sdk-java</artifactId> <version>1.0.49</version> </dependency>
消费代码
import com.volcengine.datasail.config.AutoOffsetResetMode; import com.volcengine.datasail.IHandler; import com.volcengine.datasail.Msg; import com.volcengine.datasail.Subscriber; import com.volcengine.datasail.config.Config; import com.volcengine.datasail.config.ConsumerConfig; import com.volcengine.datasail.config.NetworkType; import com.volcengine.datasail.exception.SubscriberException; public class SubscriberTest{ public static void main(String[] args) throws SubscriberException { // 1. Subscriber配置参数 Config config = new Config(); config.dataset = "Your Dataset"; config.accessKey = "Your Aceess Key"; config.secretKey = "Your Secret Key"; config.subscribeCenter = "http://localhost:6789"; config.network = NetworkType.NETWORK_EXTERNAL; config.consumer = new ConsumerConfig(); config.consumer.consumerGroup = "Your Consumer Group"; config.consumer.autoOffsetReset = AutoOffsetResetMode.LATEST; // Support "earliest" and "latest" // 2. 创建Subscriber对象,非线程安全 Subscriber subscriber = new Subscriber(config); // 4. 设置消息处理器,开启数据订阅&消费,该方法会阻塞式消费 subscriber.subscribe(new Handler()); } // 3. 业务自定义消息处理器,需实现IHandler接口 static class Handler implements IHandler { @Override public void handleMsg(Msg msg) { String value = new String(msg.getValue()); System.out.println(value); } } }
下载datasail_subscriber_go_cloud,与自己的项目目录在同一层
替换包名
go mod edit -replace volcengine.com/datasail/datasail_subscriber_go_cloud=../datasail_subscriber_go_cloud
更新依赖
go get volcengine.com/datasail/datasail_subscriber_go_cloud
package main import ( "fmt" "volcengine.com/datasail/datasail_subscriber_go_cloud" "volcengine.com/datasail/datasail_subscriber_go_cloud/config" ) func main() { // 1. Subscriber配置参数 conf := config.Config{ AccessKey: "Your Access Key", SecretKey: "Your Secret Key", SubscribeCenter: "http://localhost:6789", Dataset: "Your Dataset", Network: config.NetworkExternal, Consumer: config.ConsumerConfig{ ConsumerGroup: "Your Consumer Group", AutoOffsetReset: config.AutoOffsetResetEarliest, // Support "earliest" and "latest" }, } // 2. 创建Subscriber对象,非线程安全 subscriber, err := datasail_subscriber_go_cloud.NewSubscriber(conf) if err != nil { panic(err) } // 3. 业务自定义消息处理器,需实现HandleFunc handler := func(msg datasail_subscriber_go_cloud.Msg) { fmt.Println(string(msg.Value)) } // 4. 设置消息处理器,开启数据订阅&消费,该方法会阻塞式消费 if err := subscriber.Subscribe(handler); err != nil { panic(err) } }