日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据。本文档介绍如何使用 Python SDK 消费组消费日志。
日志服务通过 SDK 提供了消费组(ConsumerGroup)功能,支持通过消费组消费日志数据,通过消费组消费时,日志服务会自动均衡各个消费者的消费能力与进度,自动分配 Shard,您无需关注消费组的内部调度细节及消费者之间的负载均衡、故障转移等,只需要专注于业务逻辑。
关于消费组消费日志数据的基本概念等背景信息,请参考通过消费组消费数据。
说明
日志服务 SDK 消费组实现了请求失败自动重试、消费进度检查点自动上报等机制。因此,您仅需要关注于如何处理每次消费得到的 LogGroupList 的业务逻辑实现即可。
Python SDK 中,ConsumerConfig 类的构造函数返回了 Python SDK 消费组配置,其中应配置 endpoint、region、accessKeyID、accessKeySecret 等基本信息、日志项目 ID 和日志主题 ID 列表、消费组名称和消费者名称。
除此之外,您还可通过 ConsumerConfig 的其他字段进行额外的自定义配置。ConsumerConfig 支持的参数如下:
参数 | 类型 | 示例值 | 描述 |
---|---|---|---|
max_fetch_log_group_count | int | 100 | 消费者单次消费日志时,最大获取 LogGroup 数量,默认为 100,最大为 1000。 |
heartbeat_interval_in_second | int | 20 | Consumer 心跳上报时间间隔,单位为秒。 |
data_fetch_interval_in_millisecond | int | 200 | Consumer 消费日志时间间隔,单位为毫秒。 |
flush_checkpoint_interval_in_second | int | 5 | Consumer 上传消费进度的时间间隔,单位为秒。 |
consume_from | str | begin | 开始消费时的默认消费位点,与 DescribeCursor 的 From 参数一致。仅在该消费者从未上传过消费位点时有效。 |
ordered_consume | bool | False | 是否开启顺序消费。开启顺序消费后,消费者会根据 Shard 分裂的父子关系进行消费。 |
以下代码以 Python SDK 为例,演示通过 SDK 创建消费组和消费者,并消费日志的整体流程。
# coding=utf-8 from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import time from volcengine.tls.TLSService import TLSService from volcengine.tls.consumer.consumer import TLSConsumer, LogProcessor from volcengine.tls.consumer.consumer_model import ConsumerConfig from volcengine.tls.log_pb2 import LogGroupList # 您需要实现一个继承LogProcessor的类,并按照业务需要自行实现process函数,用于处理消费到的每个LogGroupList # 下面展示了逐个打印每条日志的基本代码实现 class MyLogProcessor(LogProcessor): def process(self, topic_id: str, shard_id: int, log_group_list: LogGroupList): print(topic_id + " --- " + str(shard_id)) count = 0 for log_group in log_group_list.log_groups: for log in log_group.logs: count += 1 print("*** Count = {} ***".format(count)) for content in log.contents: print("{}: {}".format(content.key, content.value)) print() if __name__ == "__main__": # 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455 # 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空 endpoint = os.environ["VOLCENGINE_ENDPOINT"] region = os.environ["VOLCENGINE_REGION"] access_key_id = os.environ["VOLCENGINE_ACCESS_KEY_ID"] access_key_secret = os.environ["VOLCENGINE_ACCESS_KEY_SECRET"] # 实例化TLS客户端 tls_service = TLSService(endpoint, access_key_id, access_key_secret, region) # 配置消费组的必填参数,ConsumerConfig构造函数设定了一些默认参数,也可根据需要自定义配置 consumer_config = ConsumerConfig(project_id="your-project-id",, consumer_group_name="python-consumer-group", consumer_name="python-consumer", topic_id_list=["your-topic-id"]) tls_consumer = TLSConsumer(consumer_config, tls_service, MyLogProcessor()) # 调用start方法开始持续消费 tls_consumer.start() # 可通过调用tls_consumer.stop()来结束消费组消费 time.sleep(10) tls_consumer.stop()