You need to enable JavaScript to run this app.
导航
默认接入点收发消息
最近更新时间:2023.07.20 21:24:37首次发布时间:2023.02.27 14:17:44

本文以 Python 客户端为例,介绍如何在 VPC 环境下通过默认接入点(PLAINTEXT)接入消息队列 Kafka版,并收发消息。

前提条件

已完成准备工作。详细说明请参考准备工作

1 添加配置文件

创建消息队列 Kafka版配置文件 config.json。配置文件字段的详细说明,请参考SDK 配置说明
使用默认接入点时,配置文件示例如下。

说明

请根据注释提示填写相关参数,并删除注释。

{
  "bootstrap.servers": "xxxxx", // 修改配置为实例的默认接入点
  "security.protocol": "PLAINTEXT", // 默认接入点访问时,固定设置为 PLAINTEXT
  "topic": "xxxx", // 修改配置为待发送的topic名称
  "consumer": {
    "group.id": "xxxx" // 修改为指定消费组的名称
  }
}

2 发送消息

实现方法

  1. 创建消息发送程序 producer.py
  2. 编译并运行 producer.py 发送消息。
  3. 查看运行结果。
    运行结果示例如下。
    图片

说明

消息队列 Kafka版提供示例项目供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令发送并消费消息。

python3 {DemoPath}/bytedance_kafka.py

示例代码

通过默认接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/producer.py,实现相关业务逻辑。

from confluent_kafka import Producer


def callback(err, meta):
    """
    py:function:: callback(err, meta)

    Handle the result of message delivery.

    :param confluent_kafka.KafkaError err: error if delivery is failed
    :param confluent_kafka.Message meta: message metadata if delivery is success
    :return: None
    """
    if err is None:
        print('[INFO] Delivered message to topic {} [{}] at offset {}'
              .format(meta.topic(), meta.partition(), meta.offset()))
    else:
        print('[ERROR] Delivery failed: {}'.format(err))


def run_producer(conf):
    # create producer instance
    p = Producer(conf.producer_conf())

    for i in range(10):
        # send a message
        p.produce(conf.topic(), 'demon message {}'.format(i).encode('utf-8'), on_delivery=callback)
        # call poll method to handle deliver result
        p.poll(timeout=0)

    # flush all messages
    p.flush(timeout=5.0)

3 消费消息

实现方法

  1. 创建 Consumer 订阅消息程序 consumer.py
  2. 编译并运行 consumer.py 消费消息。
  3. 查看运行结果。

消息消费示例代码

通过默认接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/client/consumer.py,实现相关业务逻辑。

from confluent_kafka import Consumer


def handle_message(msg):
    """
    py:function:: handle_message(msg)

    Handle consumed message.

    :param confluent_kafka.Message msg: consumed message meta.
    :return: None
    """
    print('[INFO] Consumed a message from topic {} [{}] at offset {}. value: {}'.
          format(msg.topic(), msg.partition(), msg.offset(), msg.value().decode('utf-8')))


def run_consumer(conf):
    # create a consumer instance
    c = Consumer(conf.consumer_conf())
    # subscribe topic
    c.subscribe([conf.topic()])

    try:
        count = 0
        while count < 10:
            msg = c.poll(timeout=1.0)
            if msg is None:
                # consumer initial and group rebalance may take some time.
                continue
            if msg.error():
                print('[ERROR] Consume msg failed, {}'.format(msg.error()))
                continue
            # handle kafka messages. this
            handle_message(msg)
            count += 1

        c.commit(asynchronous=False)
    finally:
        c.close()