本文以 Python 客户端为例,介绍如何在 VPC 或公网环境下通过 SASL_PLAINTEXT 接入点 SCRAM 机制接入消息队列 Kafka版,并收发消息。
已完成准备工作。详细说明请参考准备工作。
创建消息队列 Kafka版配置文件 config.json
。配置文件字段的详细说明,请参考SDK 配置说明。
通过 SASL_PLAINTEXT 接入点 SCRAM 机制接入时,配置文件示例如下。
说明
{ "bootstrap.servers": "xxxxx", // 修改配置为实例的SASL接入点 "security.protocol": "SASL_PLAINTEXT", // 固定为SASL_PLAINTEXT "topic": "xxxx", // 修改配置为待发送的topic名称 "consumer": { "group.id": "xxxx" // 修改为指定消费组的名称 }, "sasl": { "mechanism": "SCRAM-SHA-256", // 用户类型为Scram时固定为SCRAM-SHA-256 "username": "xxxx", // SCRAM用户名 "password": "xxxx" // SCRAM用户密码 } }
producer.py
。producer.py
发送消息。通过 SASL_PLAINTEXT 接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/producer.py
,实现相关业务逻辑。
from confluent_kafka import Producer def callback(err, meta): """ py:function:: callback(err, meta) Handle the result of message delivery. :param confluent_kafka.KafkaError err: error if delivery is failed :param confluent_kafka.Message meta: message metadata if delivery is success :return: None """ if err is None: print('[INFO] Delivered message to topic {} [{}] at offset {}' .format(meta.topic(), meta.partition(), meta.offset())) else: print('[ERROR] Delivery failed: {}'.format(err)) def run_producer(conf): # create producer instance p = Producer(conf.producer_conf()) for i in range(10): # send a message p.produce(conf.topic(), 'demon message {}'.format(i).encode('utf-8'), on_delivery=callback) # call poll method to handle deliver result p.poll(timeout=0) # flush all messages p.flush(timeout=5.0)
consumer.py
。consumer.py
消费消息。通过默认接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/consumer.py
,实现相关业务逻辑。
from confluent_kafka import Consumer def handle_message(msg): """ py:function:: handle_message(msg) Handle consumed message. :param confluent_kafka.Message msg: consumed message meta. :return: None """ print('[INFO] Consumed a message from topic {} [{}] at offset {}. value: {}'. format(msg.topic(), msg.partition(), msg.offset(), msg.value().decode('utf-8'))) def run_consumer(conf): # create a consumer instance c = Consumer(conf.consumer_conf()) # subscribe topic c.subscribe([conf.topic()]) try: count = 0 while count < 10: msg = c.poll(timeout=1.0) if msg is None: # consumer initial and group rebalance may take some time. continue if msg.error(): print('[ERROR] Consume msg failed, {}'.format(msg.error())) continue # handle kafka messages. this handle_message(msg) count += 1 c.commit(asynchronous=False) finally: c.close()