本文介绍如何快速使用 Volcengine Python SDK 实现基础的 Kafka 实例资源管理流程,包括创建实例、创建 Topic 等操作。
通过 Volcengine Python SDK 调用消息队列 Kafka版 V2 API CreateInstance 的示例代码如下。
from __future__ import print_function import volcenginesdkcore from pprint import pprint from volcenginesdkcore.rest import ApiException import volcenginesdkkafka if __name__ == '__main__': configuration = volcenginesdkcore.Configuration() configuration.ak = "Your AK" configuration.sk = "Your SK" configuration.region = "cn-beijing" # set default configuration volcenginesdkcore.Configuration.set_default(configuration) # use global default configuration api_instance = volcenginesdkkafka.KAFKAApi() try: resp = api_instance.create_instance(volcenginesdkkafka.CreateInstanceRequest( zone_id="cn-beijing-a", version="2.2.2", compute_spec="kafka.20xrate.hw", vpc_id="vpc-rs4yccs57e9sv0x57bf****", subnet_id="subnet-rrps5hvr1bswv0x58fp****", user_name="kafka2001", user_password="Test@123456", charge_info=volcenginesdkkafka.ChargeInfoForCreateInstanceInput( charge_type="PrePaid", auto_renew=True, period_unit="Month", period=1 ) )) pprint(resp) except ApiException as e: print("Exception when calling api: %s\n" % e)
通过 Volcengine Python SDK 调用消息队列 Kafka版 V2 API DescribeInstances 的示例代码如下。
from __future__ import print_function import volcenginesdkcore from pprint import pprint from volcenginesdkcore.rest import ApiException import volcenginesdkkafka if __name__ == '__main__': configuration = volcenginesdkcore.Configuration() configuration.ak = "Your AK" configuration.sk = "Your SK" configuration.region = "cn-beijing" # set default configuration volcenginesdkcore.Configuration.set_default(configuration) # use global default configuration api_instance = volcenginesdkkafka.KAFKAApi() try: resp = api_instance.describe_instances(volcenginesdkkafka.DescribeInstancesRequest( page_size=10, page_number=1, )) pprint(resp) except ApiException as e: print("Exception when calling api: %s\n" % e)
通过 Volcengine Python SDK 调用消息队列 Kafka版 V2 API CreateTopic 的示例代码如下。
from __future__ import print_function import volcenginesdkcore from pprint import pprint from volcenginesdkcore.rest import ApiException import volcenginesdkkafka if __name__ == '__main__': configuration = volcenginesdkcore.Configuration() configuration.ak = "Your AK" configuration.sk = "Your SK" configuration.region = "cn-beijing" # set default configuration volcenginesdkcore.Configuration.set_default(configuration) # use global default configuration api_instance = volcenginesdkkafka.KAFKAApi() try: resp = api_instance.create_topic(volcenginesdkkafka.CreateTopicRequest( access_policies=[ volcenginesdkkafka.AccessPolicyForCreateTopicInput( access_policy="PubSub", user_name="user123" ) ], all_authority=False, description="describe", instance_id="kafka-cnngbnntswg1****", topic_name="mytopic123", replica_number=3, partition_number=3, parameters="{\"LogRetentionHours\":\"72\",\"MessageMaxByte\":\"10\",\"MinInsyncReplicaNumber\":\"2\"}" )) pprint(resp) except ApiException as e: print("Exception when calling api: %s\n" % e)