Flume 是一个分布式、高可靠的海量日志采集、聚合和传输系统,支持从各个应用程序中收集数据,并上传到一个存储系统中。本文介绍如何通过 Flume 的 Kafka Sink 将数据上传到日志服务。
当 Flume 作为数据采集工具时,Flume的 Kafka Sink 支持将 Flume Channel 中的数据发送到 Kafka 中,而日志服务支持通过 Kafka 协议接收数据,因此 Flume 可以通过 Kafka Sink 将数据上传到日志服务的日志主题中。
PutLogs
权限。详细信息请参考可授权的操作。通过 Flume 上传数据到日志服务时,需要配置 Flume 提供的 Kafka Sink。此处仅罗列了上传数据到日志服务所需要的参数和影响数据上传性能的参数,其他参数说明请参考 Kafka Sink。
说明
本文以 Flume 1.9.0 版本为例,不同 Flume 版本的配置参数可能不同,详细信息请参考Flume 官网文档。
参数名称 | 是否必选 | 示例值 | 描述 |
---|---|---|---|
type | 是 | org.apache.flume.sink.kafka.KafkaSink。 | 必须配置为 org.apache.flume.sink.kafka.KafkaSink。 |
kafka.bootstrap.servers | 是 |
| 初始连接的集群地址,格式为
说明 服务地址无需指定 |
kafka.producer.security.protocol | 是 | SASL_SSL | 为保证数据传输的安全性,此处必须设置连接协议为 SASL_SSL。 |
kafka.producer.sasl.mechanism | 是 | PLAIN | 设置上传数据到日志服务的鉴权方式,此处需设置为 PLAIN。 |
kafka.producer.sasl.jaas.config | 是 | 无 | 上传数据到日志服务的具体鉴权配置,包括 username 和 password。
|
kafka.topic | 否 | f93fc61xxxxefec8a5 | Kafka 主题,此处需配置为日志服务的日志主题 ID。Flume 上传数据时,将上传到日志服务的该日志主题中。 |
flumeBatchSize | 否 | 100 | 每次上传的数据条数。较大的值将提高吞吐量但同时将增加延迟。默认值为100,不建议修改。 |
kafka.producer.acks | 否 | 1 | Kafka 生产者的消息发送确认机制。默认值为 1 ,表示 Leader broker 写入成功。不建议修改。 |
kafka.producer.batch.size | 否 | 1048576 | Kafka 生产者批量发送数据的大小,单位为字节。 说明 建议设置为 1048576,即 1MB。 |
kafka.producer.linger.ms | 否 | 100 | Kafka 生产者在批量发送消息前的等待时间,单位为毫秒。 说明 建议设置为 100。 |
kafka.producer.compression.type | 否 | lz4 | Kafka 生产者发送消息的压缩方式,建议设置为 lz4。 |
下述配置表示 Flume 将 /var/log/test1/example.log
日志文件中的数据上传到日志服务。
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /var/log/test1/example.log #替换为实际的日志文件。 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = tls-cn-beijing.ivolces.com:9094 a1.sinks.k1.channel = c1 a1.sinks.k1.kafka.topic = 5f93fcxxxxfec8a5 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.batch.size = 1048576 a1.sinks.k1.kafka.producer.linger.ms = 100 a1.sinks.k1.kafka.producer.compression.type = lz4 a1.sinks.k1.kafka.producer.security.protocol = SASL_SSL a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN a1.sinks.k1.kafka.producer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="20fxxxx8ad7af" password="AKxxxx#T0RNME5EUTFZVxxxx"; a1.channels.c1.type = memory