本文介绍如何快速使用 Volcengine Python SDK 实现基础的 Kafka 实例资源管理流程,包括创建实例、创建 Topic 等操作。
通过 Volcengine Python SDK 调用消息队列 Kafka版 V2 API CreateInstance 的示例代码如下。
from __future__ import print_function
import volcenginesdkcore
from pprint import pprint
from volcenginesdkcore.rest import ApiException
import volcenginesdkkafka
if __name__ == '__main__':
configuration = volcenginesdkcore.Configuration()
configuration.ak = "Your AK"
configuration.sk = "Your SK"
configuration.region = "cn-beijing"
# set default configuration
volcenginesdkcore.Configuration.set_default(configuration)
# use global default configuration
api_instance = volcenginesdkkafka.KAFKAApi()
try:
resp = api_instance.create_instance(volcenginesdkkafka.CreateInstanceRequest(
zone_id="cn-beijing-a",
version="2.2.2",
compute_spec="kafka.20xrate.hw",
vpc_id="vpc-rs4yccs57e9sv0x57bf****",
subnet_id="subnet-rrps5hvr1bswv0x58fp****",
user_name="kafka2001",
user_password="Test@123456",
charge_info=volcenginesdkkafka.ChargeInfoForCreateInstanceInput(
charge_type="PrePaid",
auto_renew=True,
period_unit="Month",
period=1
)
))
pprint(resp)
except ApiException as e:
print("Exception when calling api: %s\n" % e)
通过 Volcengine Python SDK 调用消息队列 Kafka版 V2 API DescribeInstances 的示例代码如下。
from __future__ import print_function
import volcenginesdkcore
from pprint import pprint
from volcenginesdkcore.rest import ApiException
import volcenginesdkkafka
if __name__ == '__main__':
configuration = volcenginesdkcore.Configuration()
configuration.ak = "Your AK"
configuration.sk = "Your SK"
configuration.region = "cn-beijing"
# set default configuration
volcenginesdkcore.Configuration.set_default(configuration)
# use global default configuration
api_instance = volcenginesdkkafka.KAFKAApi()
try:
resp = api_instance.describe_instances(volcenginesdkkafka.DescribeInstancesRequest(
page_size=10,
page_number=1,
))
pprint(resp)
except ApiException as e:
print("Exception when calling api: %s\n" % e)
通过 Volcengine Python SDK 调用消息队列 Kafka版 V2 API CreateTopic 的示例代码如下。
from __future__ import print_function
import volcenginesdkcore
from pprint import pprint
from volcenginesdkcore.rest import ApiException
import volcenginesdkkafka
if __name__ == '__main__':
configuration = volcenginesdkcore.Configuration()
configuration.ak = "Your AK"
configuration.sk = "Your SK"
configuration.region = "cn-beijing"
# set default configuration
volcenginesdkcore.Configuration.set_default(configuration)
# use global default configuration
api_instance = volcenginesdkkafka.KAFKAApi()
try:
resp = api_instance.create_topic(volcenginesdkkafka.CreateTopicRequest(
access_policies=[
volcenginesdkkafka.AccessPolicyForCreateTopicInput(
access_policy="PubSub",
user_name="user123"
)
],
all_authority=False,
description="describe",
instance_id="kafka-cnngbnntswg1****",
topic_name="mytopic123",
replica_number=3,
partition_number=3,
parameters="{\"LogRetentionHours\":\"72\",\"MessageMaxByte\":\"10\",\"MinInsyncReplicaNumber\":\"2\"}"
))
pprint(resp)
except ApiException as e:
print("Exception when calling api: %s\n" % e)