本文介绍如何快速使用 Volcengine Java SDK 实现基础的 Kafka 实例资源管理流程,包括创建实例、创建 Topic等操作。
通过 Volcengine Java SDK 调用消息队列 Kafka版 V2 API CreateInstance 的示例代码如下。
package com.volcengine.kafka.examples; import com.volcengine.ApiClient; import com.volcengine.ApiException; import com.volcengine.kafka.KafkaApi; import com.volcengine.kafka.model.*; import com.volcengine.sign.Credentials; public class TestKafka { public static void main(String[] args) throws Exception { String ak = "Your AK"; String sk = "Your SK"; String region = "cn-beijing"; ApiClient apiClient = new ApiClient() .setCredentials(Credentials.getCredentials(ak, sk)) .setRegion(region); KafkaApi api = new KafkaApi(apiClient); ChargeInfoForCreateInstanceInput chargeInfo = new ChargeInfoForCreateInstanceInput(); chargeInfo.setChargeType("PrePaid"); chargeInfo.setPeriod(1); chargeInfo.setPeriodUnit("Month"); chargeInfo.setAutoRenew(true); CreateInstanceRequest createInstanceRequest = new CreateInstanceRequest(); createInstanceRequest.setZoneId("cn-beijing-a"); createInstanceRequest.setVersion("2.2.2"); createInstanceRequest.setComputeSpec("kafka.20xrate.hw"); createInstanceRequest.setVpcId("vpc-rs4yccs57e9sv0x57bf****"); createInstanceRequest.setSubnetId("subnet-rrps5hvr1bswv0x58fp****"); createInstanceRequest.setUserName("kafka2001"); createInstanceRequest.setUserPassword("Test@123456"); createInstanceRequest.setChargeInfo(chargeInfo); try { CreateInstanceResponse response = api.createInstance(createInstanceRequest); System.out.println(response); } catch (ApiException e) { System.out.println(e.getResponseBody()); } } }
通过 Volcengine Java SDK 调用消息队列 Kafka版 V2 API DescribeInstances 的示例代码如下。
package com.volcengine.kafka.examples; import com.volcengine.ApiClient; import com.volcengine.ApiException; import com.volcengine.kafka.KafkaApi; import com.volcengine.kafka.model.DescribeInstancesRequest; import com.volcengine.kafka.model.DescribeInstancesResponse; import com.volcengine.sign.Credentials; public class TestKafka { public static void main(String[] args) throws Exception { String ak = "Your AK"; String sk = "Your SK"; String region = "cn-beijing"; ApiClient apiClient = new ApiClient() .setCredentials(Credentials.getCredentials(ak, sk)) .setRegion(region); KafkaApi api = new KafkaApi(apiClient); DescribeInstancesRequest describeInstancesRequest = new DescribeInstancesRequest(); describeInstancesRequest.setPageNumber(1); describeInstancesRequest.setPageSize(10); try { DescribeInstancesResponse response = api.describeInstances(describeInstancesRequest); System.out.println(response); } catch (ApiException e) { System.out.println(e.getResponseBody()); } } }
通过 Volcengine Java SDK 调用消息队列 Kafka版 V2 API CreateTopic 的示例代码如下。
package com.volcengine.kafka.examples; import com.volcengine.ApiClient; import com.volcengine.ApiException; import com.volcengine.kafka.KafkaApi; import com.volcengine.kafka.model.*; import com.volcengine.sign.Credentials; import java.util.ArrayList; public class TestKafka { public static void main(String[] args) throws Exception { String ak = "Your AK"; String sk = "Your SK"; String region = "cn-beijing"; ApiClient apiClient = new ApiClient() .setCredentials(Credentials.getCredentials(ak, sk)) .setRegion(region); KafkaApi api = new KafkaApi(apiClient); AccessPolicyForCreateTopicInput policy = new AccessPolicyForCreateTopicInput(); policy.setAccessPolicy("PubSub"); policy.setUserName("user123"); ArrayList<AccessPolicyForCreateTopicInput> policies = new ArrayList<>(); policies.add(policy); CreateTopicRequest req = new CreateTopicRequest(); req.setInstanceId("kafka-cnngbnntswg1****"); req.setTopicName("my_topic3"); req.setPartitionNumber(3); req.setAllAuthority(false); req.setDescription("describe"); req.setReplicaNumber(3); req.setParameters("{\"LogRetentionHours\":\"72\",\"MessageMaxByte\":\"10\",\"MinInsyncReplicaNumber\":\"2\"}"); req.setAccessPolicies(policies); try { CreateTopicResponse response = api.createTopic(req); System.out.println(response); } catch (ApiException e) { System.out.println(e.getResponseBody()); } } }