本文为您介绍如何基于函数服务的微服务应用模板,一键拉起数据处理服务,将 消息队列 Kafka 版 数据同步至 云搜索服务 ES。
数据处理服务主要用于日志处理、分析场景,帮助企业快速发现和解决问题,提高运营效率。在大型系统中,为了保证系统能够正常采集并分析庞杂的日志数据, 一般做法是引入 Kafka 的流式架构来实现大量数据异步采集的方案。采集到的日志数据会经过 Kafka 流转, 由相应的数据消费组件将数据从 Kafka 消费存入到 Elasticsearch 中。使用 函数服务(Volcano Engine Function as a Service,veFaaS) 可作为替换开源工具 LogStash 的一种方案。 用户自建 LogStash 存在数据丢失、收集性能较差、资源消耗大、上手难度大等痛点。而采用 veFaaS 的 Serverless 数据处理方案,更加简单易用、轻量,并且具备自动弹性、低成本、高吞吐、低延迟等优势。
说明
推荐所有数据传输都在 VPC 内完成,创建 Kafka 实例,ES 实例,veFaaS 微服务应用时请选择同一 VPC。
选择 vefaas-native-microservice-kafka-to-es-exporter 函数模板,一键创建应用。
配置项 | 说明 |
---|---|
部署方式 | 选择 函数模板:vefaas-native-microservice-kafka-to-es-exporter。 |
网络 | 选择前提条件准备的 私有网络、子网 和 安全组。 |
更新 config.json 配置文件,填写您的 Kafka 和 ES 相关配置。
在当前函数的 代码 页签,单击右上角下载图标,将代码包下载至本地。
本地解压代码包。
更新 config.json 配置文件。
{ // Kafka 相关配置 "kafka": { //kafka 接入点信息 "bootstrap.servers": "127.0.0.1:8092", //安全协议 "security.protocol": "SASL_PLAINTEXT", //是否开启 debug 模式 "debug": false, // 待同步消息的 topic 名称 "topics": "my-topic", //消费组 ID "consumer.group.id": "test", //SASL 相关配置 "sasl": { //是否启用 SASL "enabled": true, //认证算法 "mechanism": "PLAIN", //用户名 "username": "test-user", //密码 "password": "test-password" } }, // elasticsearch 相关配置 "elasticsearch": { //ES访问地址 "host": "127.0.0.1:9200", //用户名 "username": "test-user", //密码 "password": "test-password", //要写入的索引名称 "index": "test-index" } }
字段 | 说明 |
---|---|
Kafka 相关配置 | |
security.protocol | 安全协议。可选值有:
|
bootstrap.servers | kafka 接入点信息。
说明 可在消息队列 Kafka 版控制台的 实例详情 > 服务访问 中查看接入点地址,建议填写私网地址。 |
debug | 是否开启 debug 模式。
|
topics | 待要同步消息的 Topic 列表。多个 Topic 使用","分隔,例如 |
consumer.group.id | 对应的消费组 ID。 |
sasl | SASL 相关配置。
|
ES 相关配置 | |
host | ES 访问地址。 说明 可在云搜索控制台的实例 基本信息 中查看访问地址,建议填写私网地址,建议开启 HTTPS。 |
username | 用户名。可在云搜索控制台的实例 基本信息 中查看,默认为 admin。 |
password | 密码。创建实例时输入的登录密码,可在云搜索控制台的实例 基本信息 中重置密码。 |
index | 要写入的索引名称,需要提前在 ES 中创建。 |
执行sh build_and_zip.sh
,对代码进行编译和打包。
注意
必须在 Linux 平台执行,不支持交叉编译。
上传代码。
在当前函数的 代码 页签,单击 上传 > 本地 Zip 文件上传,上传步骤 4 已打包的代码。
将函数的当前代码和配置发布至线上。
使用目标 Kafka Topic 发送消息。
在消息队列 Kafka 版控制台的 实例详情 > Topic 管理 页签,选择目标 Topic,单击操作列的 发送消息。本示例发送的消息内容为 hello。
通过 ES API 查询文档是否创建成功。
# https 方式可指定--insecure快速验证 # test-index 为本示例的索引名称 curl https://elasticsearch-o-xxxxx.escloud.volces.com:9200/test-index/_search --insecure -u admin:xxxx
预期 ES 返回示例如下:
{ "took": 3, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, // 创建的文档:包含 kafka 消息的 topic、value、partition、offset 以及消息时间戳等等信息。由 "value": "hello" 可知,消息同步成功。 "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "test-index", "_type": "_doc", "_id": "0xKjb5IBOCLUIact9aQp", "_score": 1.0, "_source": { "header": {}, "key": "", "offset": 0, "partition": 7, "timestamp": "2024-10-09 12:57:48.631 +0800 CST", "topic": "my-topic", "value": "hello" } } ] } }
部署成功后,用户无需手动调整实例数,veFaaS 会根据程序负载进行自动扩缩容,在高负载时快速自动扩容,在低负载时自动缩容节省成本。您也可以根据需要自行设置扩缩容策略,详情请参见 弹性扩缩。