You need to enable JavaScript to run this app.
导航
使用 Flume 同步数据到 Kudu
最近更新时间:2023.11.27 14:22:06首次发布时间:2022.10.21 16:56:21

Apache Flume 是 cloudera 公司开源的分布式海量日志聚合系统,可用于在系统中从多种不同数据源收集,聚合,传输大量日志数据到数据集中存储地。作为一款流式日志采集工具,Flume 提供对数据进行简单处理并写到各种数据接收方(可定制)的能力。

火山引擎 E-MapReduce(EMR)已经集成了 Flume ,您可以通过 Flume 将不同数据源的数据同步到 Kudu。

1 添加 Flume 服务

在 Hadoop 集群,Flume 是可选组件,如果要使用 Flume ,需要手工添加 Flume 组件。

  1. 登录 EMR 控制台

  2. 在左侧导航栏中,选择并进入 集群列表 > 集群名称 > 服务列表 界面。

  3. 点击右上角 【添加服务

  4. 选择 Flume,点击 确定 按钮,进行安装。

2 使用 Flume 写 Kudu

火山引擎 EMR 的 Flume 默认集成了写 Kudu 的依赖。下文为您介绍如何使用 Kudu 官方提供的组件消费数据并写到 Kudu。

说明

火山引擎 EMR 的 Flume 集成了 1.9.0 版本的 kudu-flume-sink 和 kudu-client。

2.1 前提条件

EMR 集群已安装了 Kudu 和 Flume,详见:创建集群

2.2 获得 Kudu 的 master 地址

  1. 登录 EMR 控制台

  2. 在左侧导航栏中,选择并进入 集群列表 > 集群名称 > 服务列表 > Kudu 界面。

  3. 点击展开 Kudu Master,列出 Kudu Master 的机器列表。

2.3 登录集群操作

  • 方案一:使用 SSH 方式登录到 Kudu Master 的其中一台机器上,详情参见使用 SSH连接主节点

  • 方案二:可以通过 EMR 集群 Kudu 组件服务部署拓扑中的 ECS 实例,跳转进入到云服务器的实例界面,单击右上角的远程连接按钮,输入集群创建时的 root 密码,进入远程终端。

登录集群后,直接在 Shell 中执行以下相关 Kudu 命令:

  1. 使用 kudu 命令创建 Kudu 表。

    kudu table create emr-3432jdr2za2uxxx-master-1:7051,emr-3432jdr2za2uxxx-core-1:7051,emr-3432jdr2za2uxxx-core-2:7051 '{"table_name":"test","schema":{"columns":[{"column_name":"id","column_type":"INT32","default_value":"1"},{"column_name":"name","column_type":"STRING","is_nullable":false,"comment":"user name"}],"key_column_names":["id"]},"partition":{"hash_partitions":[{"columns":["id"],"num_buckets":2,"seed":100}]},"extra_configs":{"configs":{"kudu.table.history_max_age_sec":"3600"}},"num_replicas":1}'
    

注意

上面的命令中 kudu master 地址需要用您的实际地址。

  1. 配置 flume

    新建文件并编写 /tmp/flume-kudu.conf ,内容如下:

    a1.sources = s1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.s1.type = exec  
    a1.sources.s1.command = cat /tmp/test.log  
    a1.sources.s1.channels = c1 
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1001
    a1.channels.c1.transactionCapacity = 1000
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink
    a1.sinks.k1.masterAddresses = emr-3432jdr2za2uxxx-master-1:7051,emr-3432jdr2za2uxxx-core-1:7051,emr-3432jdr2za2uxxx-core-2:7051
    a1.sinks.k1.tableName = test
    a1.sinks.k1.operation = insert
    a1.sinks.k1.batchSize = 50
    a1.sinks.k1.producer = org.apache.kudu.flume.sink.RegexpKuduOperationsProducer
    a1.sinks.k1.producer.pattern = (?<id>\\d+),(?<name>\\w+)
    a1.sinks.k1.channel = c1
    
  2. 准备输入数据

    创建 /tmp/test.log ,并写入数据

    rm /tmp/test.log
    echo "1,fang" > /tmp/test.log
    
  3. 启动 flume,读取 /tmp/test.log 的数据,并写到 kudu

    /usr/lib/emr/current/flume/bin/flume-ng agent --conf conf --conf-file /tmp/flume-kudu.conf --name a1
    
  4. 查看 kudu 是否有数据写入

    root@emr-3432jdr2za2uxxx-master-1:~# kudu table scan emr-3432jdr2za2uxxx-master-1:7051,emr-3432jdr2za2uxxx-core-1:7051,emr-3432jdr2za2uxxx-core-2:7051 test
    (int32 id=6, string name="fang")
    T 234196e2550f45e09aaa178ab42dcb7a scanned count 1 cost 0.00311413 seconds
    T 13247a58bdf84c248caf7c2f14116a83 scanned count 0 cost 0.00306098 seconds
    Total count 1 cost 0.00329723 seconds
    

3 使用自定义的 KuduOperationsProducer

RegexpKuduOperationsProducer 是 kudu 自带的消息体解析实现。此外,kudu 提供的解析实现有 AvroKuduOperationsProducer, SimpleKeyedKuduOperationsProducer, SimpleKuduOperationsProducer。如果这些解析都不满足需求,可以自定义特定的实现。

  1. 创建 maven 工程

    mvn archetype:generate -DgroupId=com.example -DartifactId=kudu-flume-sink -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
    
  2. 添加 pom 依赖

    <dependency>
          <groupId>org.apache.kudu</groupId>
          <artifactId>kudu-flume-sink</artifactId>
          <version>1.9.0</version>
    </dependency>
    
  3. 编写 Processor

    package com.example;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.kudu.client.KuduTable;
    import org.apache.kudu.client.Operation;
    import org.apache.kudu.flume.sink.KuduOperationsProducer;
    
    import java.util.LinkedList;
    import java.util.List;
    
    public class MyProcessor implements KuduOperationsProducer {
        public void initialize(KuduTable kuduTable) {
        }
    
        public List<Operation> getOperations(Event event) {
            List<Operation> operations = new LinkedList<Operation>();
            // TODO 解析 event ,生成 kudu 的 Operation
            return operations;
        }
    
        public void close() {
    
        }
    
        public void configure(Context context) {
    
        }
    }
    
  4. 打包

    mvn package -DskipTests
    
  5. 上传包到服务器上的 /usr/lib/emr/current/flume/lib 下

  6. 使用自定义的 Processor

    a1.sinks.k1.producer = com.example.MyProcessor