本文介绍如何快速使用日志服务 Python SDK 实现基础的日志采集流程,包括创建日志项目、创建日志主题、写入日志和查询日志等操作。
VOLCENGINE_ACCESS_KEY_ID
等环境变量。环境变量的配置方式请参考配置身份认证信息。日志服务 Python SDK 在调用 PutLogs 接口时默认使用 lz4 压缩,但出于跨平台兼容性考虑,lz4a 库未包含在日志服务 Python SDK 的安装脚本中。如果您需要在上传日志过程中使用 lz4 压缩,则需要在环境中手动安装 lz4a 库。
pip install lz4a==0.7.0
如果您使用的是 Windows 系统或 Python 3.10 及后续版本,则 SDK 无法兼容 lz4 压缩,请您跳过 lz4a 库安装并在 PutLogsV2Request 中指定 compression=zlib
。
PutLogsV2Request 的使用示例如下:
使用 lz4 压缩
tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs))
使用 zlib 压缩
tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs, compression="zlib"))
初始化 Client 实例之后,才可以向 TLS 服务发送请求。初始化时推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。
初始化代码如下:
from volcengine.tls.TLSService import TLSService # 注意,环境变量中的endpoint不包含协议头(https://或http://),例如 tls-cn-beijing.ivolces.com。 endpoint = os.environ["VOLCENGINE_ENDPOINT"] region = os.environ["VOLCENGINE_REGION"] access_key_id = os.environ["VOLCENGINE_ACCESS_KEY_ID"] access_key_secret = os.environ["VOLCENGINE_ACCESS_KEY_SECRET"] tls_service = TLSService(endpoint, access_key_id, access_key_secret, region)
本示例中,创建一个 example_tls.py
文件,并调用接口分别完成创建项目、创建主题、创建索引、写入日志数据、消费日志和查询日志数据。
代码示例如下:
# coding=utf-8 from __future__ import absolute_import from __future__ import division from __future__ import print_function import time from volcengine.tls.TLSService import TLSService from volcengine.tls.tls_requests import * if __name__ == "__main__": # 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455 # 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空 endpoint = os.environ["VOLCENGINE_ENDPOINT"] region = os.environ["VOLCENGINE_REGION"] access_key_id = os.environ["VOLCENGINE_ACCESS_KEY_ID"] access_key_secret = os.environ["VOLCENGINE_ACCESS_KEY_SECRET"] # 实例化TLS客户端 tls_service = TLSService(endpoint, access_key_id, access_key_secret, region) # 创建日志项目 create_project_request = CreateProjectRequest(project_name="project-name", region=region, description="project-description") create_project_response = tls_service.create_project(create_project_request) project_id = create_project_response.project_id # 创建日志主题 create_topic_request = CreateTopicRequest(topic_name="topic-name", project_id=project_id, ttl=3650, description="topic-description", shard_count=2) create_topic_response = tls_service.create_topic(create_topic_request) topic_id = create_topic_response.topic_id # 创建索引 full_text = FullTextInfo(case_sensitive=False, delimiter=",-;", include_chinese=False) value_info_a = ValueInfo(value_type="text", delimiter="", case_sensitive=True, include_chinese=False, sql_flag=False) value_info_b = ValueInfo(value_type="long", delimiter="", case_sensitive=False, include_chinese=False, sql_flag=True) key_value_info_a = KeyValueInfo(key="key1", value=value_info_a) key_value_info_b = KeyValueInfo(key="key2", value=value_info_b) key_value = [key_value_info_a, key_value_info_b] create_index_request = CreateIndexRequest(topic_id, full_text, key_value) create_index_response = tls_service.create_index(create_index_request) # 写入日志数据 logs = PutLogsV2Logs(source="192.168.1.1", filename="sys.log") for i in range(100): logs.add_log(contents={"key1": "value1-" + str(i + 1), "key2": "value2-" + str(i + 1)}, log_time=int(round(time.time()))) tls_service.put_logs_v2(PutLogsV2Request(topic_id, logs)) time.sleep(30) # 查询消费游标 describe_cursor_request = DescribeCursorRequest(topic_id, shard_id=0, from_time="begin") describe_cursor_response = tls_service.describe_cursor(describe_cursor_request) # 消费日志数据 consume_logs_request = ConsumeLogsRequest(topic_id, shard_id=0, cursor=describe_cursor_response.cursor) consume_logs_response = tls_service.consume_logs(consume_logs_request) # 当您需要检索和分析日志时,推荐您使用Python SDK提供的search_logs_v2方法,下面的代码提供了具体的调用示例 # 查询日志数据(全文检索) search_logs_request = SearchLogsRequest(topic_id, query="error", limit=10, start_time=1346457600000, end_time=1630454400000) search_logs_response = tls_service.search_logs_v2(search_logs_request) # 查询日志数据(键值检索) search_logs_request = SearchLogsRequest(topic_id, query="key1:error", limit=10, start_time=1346457600000, end_time=1630454400000) search_logs_response = tls_service.search_logs_v2(search_logs_request) # 查询日志数据(SQL分析) search_logs_request = SearchLogsRequest(topic_id, query="* | select key1, key2", limit=10, start_time=1346457600000, end_time=1630454400000) search_logs_response = tls_service.search_logs_v2(search_logs_request) # 查询日志数据(SQL分析) search_logs_request = SearchLogsRequest(topic_id, query="* | select key1, key2", limit=10, start_time=1346457600000, end_time=1630454400000) search_logs_response = tls_service.search_logs(search_logs_request)