You need to enable JavaScript to run this app.
导航
Python SDK
最近更新时间:2024.11.08 10:57:30首次发布时间:2024.03.27 14:56:19

本文介绍如何通过 Python SDK 接入云原生消息引擎 BMQ 并收发消息。

前提条件

  • 创建资源实例,并获取接入点地址,请参见管理资源池
  • 您需要提前在实例所属安全组中放开 9092 端口。具体操作,请参见添加安全组访问规则
  • (可选)您如果需要通过 SASL 用户名和密码进行鉴权,还需提前创建用户并获取密码。具体操作,请参见创建 SASL 用户

安装依赖

pip install kafka-python

设置Debug日志

import logging
import sys
logger = logging.getLogger('kafka')
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)

发送消息

创建并编写producer.py发送消息。

PLAINTEXT

使用PLAINTEXT协议接入点地址连接 BMQ 实例时,无需鉴权。

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers='your broker list',
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers="your broker list",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

from kafka import KafkaProducer
producer = KafkaProducer(
    bootstrap_servers="your broker list",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    ssl_check_hostname=False,
    api_version=(0, 10, 2),
)
for _ in range(100):
    result = producer.send('your topic', b'some_message_bytes').get()
    print("send message: partition " + str(result.partition) + " offset " + str(result.offset))

消费消息

创建并编写consumer.py接收消息。

PLAINTEXT

使用PLAINTEXT协议接入点地址连接 BMQ 实例时,无需鉴权。

from kafka import KafkaConsumer
consumer = KafkaConsumer('your topic', bootstrap_servers="your broker list", group_id="your consumer group",api_version=(0, 10, 2))
for msg in consumer:
    print(msg)

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

from kafka import KafkaConsumer
consumer = KafkaConsumer(
    "your topic",
    bootstrap_servers="your broker list",
    group_id="your consumer group",
    security_protocol="SASL_PLAINTEXT",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    api_version=(0, 10, 2),
)
for msg in consumer:
    print(msg)

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

from kafka import KafkaConsumer
consumer = KafkaConsumer(
    "your topic",
    bootstrap_servers="your broker list",
    group_id="your consumer group",
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username="用户名",
    sasl_plain_password="密码",
    ssl_check_hostname=False,
    api_version=(0, 10, 2),
)
for msg in consumer:
    print(msg)