Apache Flume 是 cloudera 公司开源的分布式海量日志聚合系统,可用于在系统中从多种不同数据源收集,聚合,传输大量日志数据到数据集中存储地。作为一款流式日志采集工具,Flume 提供对数据进行简单处理并写到各种数据接收方(可定制)的能力。
火山引擎 E-MapReduce(EMR)已经集成了 Flume ,您可以通过 Flume 将不同数据源的数据同步到 Kudu。
在 Hadoop 集群,Flume 是可选组件,如果要使用 Flume ,需要手工添加 Flume 组件。
登录 EMR 控制台。
在左侧导航栏中,选择并进入 集群列表 > 集群名称 > 服务列表 界面。
点击右上角 【添加服务】
选择 Flume,点击 确定 按钮,进行安装。
火山引擎 EMR 的 Flume 默认集成了写 Kudu 的依赖。下文为您介绍如何使用 Kudu 官方提供的组件消费数据并写到 Kudu。
说明
火山引擎 EMR 的 Flume 集成了 1.9.0 版本的 kudu-flume-sink 和 kudu-client。
EMR 集群已安装了 Kudu 和 Flume,详见:创建集群。
登录 EMR 控制台。
在左侧导航栏中,选择并进入 集群列表 > 集群名称 > 服务列表 > Kudu 界面。
点击展开 Kudu Master,列出 Kudu Master 的机器列表。
方案一:使用 SSH 方式登录到 Kudu Master 的其中一台机器上,详情参见使用 SSH连接主节点。
方案二:可以通过 EMR 集群 Kudu 组件服务部署拓扑中的 ECS 实例,跳转进入到云服务器的实例界面,单击右上角的远程连接按钮,输入集群创建时的 root 密码,进入远程终端。
登录集群后,直接在 Shell 中执行以下相关 Kudu 命令:
使用 kudu 命令创建 Kudu 表。
kudu table create emr-3432jdr2za2uxxx-master-1:7051,emr-3432jdr2za2uxxx-core-1:7051,emr-3432jdr2za2uxxx-core-2:7051 '{"table_name":"test","schema":{"columns":[{"column_name":"id","column_type":"INT32","default_value":"1"},{"column_name":"name","column_type":"STRING","is_nullable":false,"comment":"user name"}],"key_column_names":["id"]},"partition":{"hash_partitions":[{"columns":["id"],"num_buckets":2,"seed":100}]},"extra_configs":{"configs":{"kudu.table.history_max_age_sec":"3600"}},"num_replicas":1}'
注意
上面的命令中 kudu master 地址需要用您的实际地址。
配置 flume
新建文件并编写 /tmp/flume-kudu.conf ,内容如下:
a1.sources = s1 a1.sinks = k1 a1.channels = c1 a1.sources.s1.type = exec a1.sources.s1.command = cat /tmp/test.log a1.sources.s1.channels = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1001 a1.channels.c1.transactionCapacity = 1000 # Describe the sink a1.sinks.k1.type = org.apache.kudu.flume.sink.KuduSink a1.sinks.k1.masterAddresses = emr-3432jdr2za2uxxx-master-1:7051,emr-3432jdr2za2uxxx-core-1:7051,emr-3432jdr2za2uxxx-core-2:7051 a1.sinks.k1.tableName = test a1.sinks.k1.operation = insert a1.sinks.k1.batchSize = 50 a1.sinks.k1.producer = org.apache.kudu.flume.sink.RegexpKuduOperationsProducer a1.sinks.k1.producer.pattern = (?<id>\\d+),(?<name>\\w+) a1.sinks.k1.channel = c1
准备输入数据
创建 /tmp/test.log ,并写入数据
rm /tmp/test.log echo "1,fang" > /tmp/test.log
启动 flume,读取 /tmp/test.log 的数据,并写到 kudu
/usr/lib/emr/current/flume/bin/flume-ng agent --conf conf --conf-file /tmp/flume-kudu.conf --name a1
查看 kudu 是否有数据写入
root@emr-3432jdr2za2uxxx-master-1:~# kudu table scan emr-3432jdr2za2uxxx-master-1:7051,emr-3432jdr2za2uxxx-core-1:7051,emr-3432jdr2za2uxxx-core-2:7051 test (int32 id=6, string name="fang") T 234196e2550f45e09aaa178ab42dcb7a scanned count 1 cost 0.00311413 seconds T 13247a58bdf84c248caf7c2f14116a83 scanned count 0 cost 0.00306098 seconds Total count 1 cost 0.00329723 seconds
RegexpKuduOperationsProducer 是 kudu 自带的消息体解析实现。此外,kudu 提供的解析实现有 AvroKuduOperationsProducer, SimpleKeyedKuduOperationsProducer, SimpleKuduOperationsProducer。如果这些解析都不满足需求,可以自定义特定的实现。
创建 maven 工程
mvn archetype:generate -DgroupId=com.example -DartifactId=kudu-flume-sink -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
添加 pom 依赖
<dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-flume-sink</artifactId> <version>1.9.0</version> </dependency>
编写 Processor
package com.example; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Operation; import org.apache.kudu.flume.sink.KuduOperationsProducer; import java.util.LinkedList; import java.util.List; public class MyProcessor implements KuduOperationsProducer { public void initialize(KuduTable kuduTable) { } public List<Operation> getOperations(Event event) { List<Operation> operations = new LinkedList<Operation>(); // TODO 解析 event ,生成 kudu 的 Operation return operations; } public void close() { } public void configure(Context context) { } }
打包
mvn package -DskipTests
上传包到服务器上的 /usr/lib/emr/current/flume/lib 下
使用自定义的 Processor
a1.sinks.k1.producer = com.example.MyProcessor