You need to enable JavaScript to run this app.
导航
通过函数服务同步 Kafka 数据到 ES
最近更新时间:2024.11.13 17:48:39首次发布时间:2024.10.25 16:48:33

本文为您介绍如何基于函数服务的微服务应用模板,一键拉起数据处理服务,将 消息队列 Kafka 版 数据同步至 云搜索服务 ES

场景介绍

数据处理服务主要用于日志处理、分析场景,帮助企业快速发现和解决问题,提高运营效率。在大型系统中,为了保证系统能够正常采集并分析庞杂的日志数据, 一般做法是引入 Kafka 的流式架构来实现大量数据异步采集的方案。采集到的日志数据会经过 Kafka 流转, 由相应的数据消费组件将数据从 Kafka 消费存入到 Elasticsearch 中。使用 函数服务(Volcano Engine Function as a Service,veFaaS) 可作为替换开源工具 LogStash 的一种方案。 用户自建 LogStash 存在数据丢失、收集性能较差、资源消耗大、上手难度大等痛点。而采用 veFaaS 的 Serverless 数据处理方案,更加简单易用、轻量,并且具备自动弹性、低成本、高吞吐、低延迟等优势。

前提条件

  • 私有网络 VPC

    说明

    推荐所有数据传输都在 VPC 内完成,创建 Kafka 实例,ES 实例,veFaaS 微服务应用时请选择同一 VPC。

  • 消息队列 Kafka
  • 云搜索服务 ES
    • 已开通火山引擎云搜索服务。
    • 已创建 ES 实例,注意选择 ES 版本,并设置登录密码。详细操作请参见 创建实例
    • 已创建 ES 索引,详细操作请参见 创建 ES 索引

操作步骤

步骤一:创建微服务应用

选择 vefaas-native-microservice-kafka-to-es-exporter 函数模板,一键创建应用。

  1. 登录 函数服务控制台
  2. 在顶部导航栏,选择目标地域。
  3. 函数列表 页面,单击 创建函数,选择 创建 「微服务应用」
  4. 按要求配置函数。主要配置项说明如下,其余配置项保持默认值即可。
    Image

配置项

说明

部署方式

选择 函数模板:vefaas-native-microservice-kafka-to-es-exporter

网络

选择前提条件准备的 私有网络子网安全组

  1. 单击 确定,完成函数创建。

步骤二:更新配置文件

更新 config.json 配置文件,填写您的 Kafka 和 ES 相关配置。

  1. 在当前函数的 代码 页签,单击右上角下载图标,将代码包下载至本地。

  2. 本地解压代码包。

  3. 更新 config.json 配置文件。

    1. 在代码包中 打开 config_template.json 文件,将模板中的内容拷贝至 config.json 文件中。
    2. 根据需要修改 config.json 配置文件。
    {
      // Kafka 相关配置
      "kafka": {
        //kafka 接入点信息
        "bootstrap.servers": "127.0.0.1:8092",
        //安全协议
        "security.protocol": "SASL_PLAINTEXT",
        //是否开启 debug 模式
        "debug": false,
        // 待同步消息的 topic 名称
        "topics": "my-topic",
        //消费组 ID
        "consumer.group.id": "test",
        //SASL 相关配置
        "sasl": {
          //是否启用 SASL
          "enabled": true,
          //认证算法
          "mechanism": "PLAIN",
          //用户名
          "username": "test-user",
          //密码
          "password": "test-password"
        }
      },
      // elasticsearch 相关配置
      "elasticsearch": {
        //ES访问地址
        "host": "127.0.0.1:9200",
        //用户名
        "username": "test-user",
        //密码
        "password": "test-password",
        //要写入的索引名称
        "index": "test-index"
      }
    }
    

    字段

    说明

    Kafka 相关配置

    security.protocol

    安全协议。可选值有:

    • PLAINTEXT
    • SASL_PLAINTEXT(默认值)
    • SASL_SSL

    bootstrap.servers

    kafka 接入点信息。

    • security.protocol 是 PLAINTEXT,则填入默认接入点。
    • security.protocol 是 SASL_PLAINTEXT,则填入 SASL_PLAINTEXT 接入点。
    • security.protocol 是 SASL_SSL,则填入 SASL_SSL 接入点。

    说明

    可在消息队列 Kafka 版控制台的 实例详情 > 服务访问 中查看接入点地址,建议填写私网地址。

    debug

    是否开启 debug 模式。

    • true:开启
    • false:关闭

    topics

    待要同步消息的 Topic 列表。多个 Topic 使用","分隔,例如"topic1,topic2"

    consumer.group.id

    对应的消费组 ID。

    sasl

    SASL 相关配置。

    • enabled:是否启用 SASL。当 security.protocol 为 SASL_PLAINTEXT 或者 SASL_SSL,需要填写 SASL 相关配置,并设置 enabled 为 true。
    • mechanism: 认证算法。当前只支持 PLAIN 或 SCRAM-SHA-256,根据密码类型选择。
    • username&password:用户名和密码。可在消息队列 Kafka 版控制台的 实例详情 > 用户管理 中查看。

    ES 相关配置

    host

    ES 访问地址。

    说明

    可在云搜索控制台的实例 基本信息 中查看访问地址,建议填写私网地址,建议开启 HTTPS。

    username

    用户名。可在云搜索控制台的实例 基本信息 中查看,默认为 admin。

    password

    密码。创建实例时输入的登录密码,可在云搜索控制台的实例 基本信息 中重置密码。

    index

    要写入的索引名称,需要提前在 ES 中创建。

  4. 执行sh build_and_zip.sh,对代码进行编译和打包。

    注意

    必须在 Linux 平台执行,不支持交叉编译。

  5. 上传代码。
    在当前函数的 代码 页签,单击 上传 > 本地 Zip 文件上传,上传步骤 4 已打包的代码。

步骤三:部署应用

将函数的当前代码和配置发布至线上。

  1. 在函数详情页面,单击右上角的 发布
  2. 函数版本使用 Latest。其他参数保持默认。
  3. 单击 确定。等待函数发布完成。

Image

结果验证

  1. 使用目标 Kafka Topic 发送消息。
    在消息队列 Kafka 版控制台的 实例详情 > Topic 管理 页签,选择目标 Topic,单击操作列的 发送消息。本示例发送的消息内容为 hello。
    Image

  2. 通过 ES API 查询文档是否创建成功。

    # https 方式可指定--insecure快速验证
    # test-index 为本示例的索引名称
    curl https://elasticsearch-o-xxxxx.escloud.volces.com:9200/test-index/_search --insecure -u admin:xxxx
    

    预期 ES 返回示例如下:

    {
        "took": 3,
        "timed_out": false,
        "_shards": {
            "total": 1,
            "successful": 1,
            "skipped": 0,
            "failed": 0
        },
        // 创建的文档:包含 kafka 消息的 topic、value、partition、offset 以及消息时间戳等等信息。由 "value": "hello" 可知,消息同步成功。
        "hits": {
            "total": {
                "value": 1,
                "relation": "eq"
            },
            "max_score": 1.0,
            "hits": [
                {
                    "_index": "test-index",
                    "_type": "_doc",
                    "_id": "0xKjb5IBOCLUIact9aQp",
                    "_score": 1.0,
                    "_source": {
                        "header": {},
                        "key": "",
                        "offset": 0,
                        "partition": 7,
                        "timestamp": "2024-10-09 12:57:48.631 +0800 CST",
                        "topic": "my-topic",
                        "value": "hello"
                    }
                }
            ]
        }
    }
    

相关操作

部署成功后,用户无需手动调整实例数,veFaaS 会根据程序负载进行自动扩缩容,在高负载时快速自动扩容,在低负载时自动缩容节省成本。您也可以根据需要自行设置扩缩容策略,详情请参见 弹性扩缩