本文介绍如何通过 Python SDK 接入云原生消息引擎 BMQ 并收发消息。
pip install kafka-python
import logging import sys logger = logging.getLogger('kafka') logger.addHandler(logging.StreamHandler(sys.stdout)) logger.setLevel(logging.DEBUG)
创建并编写producer.py
发送消息。
使用PLAINTEXT
协议接入点地址连接 BMQ 实例时,无需鉴权。
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers='your broker list', api_version=(0, 10, 2), ) for _ in range(100): result = producer.send('your topic', b'some_message_bytes').get() print("send message: partition " + str(result.partition) + " offset " + str(result.offset))
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers="your broker list", security_protocol="SASL_PLAINTEXT", sasl_mechanism="PLAIN", sasl_plain_username="用户名", sasl_plain_password="密码", api_version=(0, 10, 2), ) for _ in range(100): result = producer.send('your topic', b'some_message_bytes').get() print("send message: partition " + str(result.partition) + " offset " + str(result.offset))
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers="your broker list", security_protocol="SASL_SSL", sasl_mechanism="PLAIN", sasl_plain_username="用户名", sasl_plain_password="密码", ssl_check_hostname=False, api_version=(0, 10, 2), ) for _ in range(100): result = producer.send('your topic', b'some_message_bytes').get() print("send message: partition " + str(result.partition) + " offset " + str(result.offset))
创建并编写consumer.py
接收消息。
使用PLAINTEXT
协议接入点地址连接 BMQ 实例时,无需鉴权。
from kafka import KafkaConsumer consumer = KafkaConsumer('your topic', bootstrap_servers="your broker list", group_id="your consumer group",api_version=(0, 10, 2)) for msg in consumer: print(msg)
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
from kafka import KafkaConsumer consumer = KafkaConsumer( "your topic", bootstrap_servers="your broker list", group_id="your consumer group", security_protocol="SASL_PLAINTEXT", sasl_mechanism="PLAIN", sasl_plain_username="用户名", sasl_plain_password="密码", api_version=(0, 10, 2), ) for msg in consumer: print(msg)
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
from kafka import KafkaConsumer consumer = KafkaConsumer( "your topic", bootstrap_servers="your broker list", group_id="your consumer group", security_protocol="SASL_SSL", sasl_mechanism="PLAIN", sasl_plain_username="用户名", sasl_plain_password="密码", ssl_check_hostname=False, api_version=(0, 10, 2), ) for msg in consumer: print(msg)