日志服务全面集成 Flink 生态体系,不仅支持接收由 Flink connector 写入的日志,还支持通过 flink-connector-kafka 将日志消费到下游的大数据组件中。
Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架,具有丰富的连接器生态,例如 Flink connector 和 Apache Kafka 连接器(flink-connector-kafka 插件)。
日志服务全面集成 Flink 生态体系,支持通过 Flink connector 轻松接入不同源头的各类日志,实现日志的实时采集、深度分析与结构化处理等操作;支持通过 flink-connector-kafka 插件无缝对接大数据处理与分析平台,将处理后的日志消费到下游的大数据组件或者数据仓库。
说明
目前只支持 Flink at least once 语义,在任务失败时,写入日志服务中的数据有可能会重复,但不会丢失。
初始化 FlinkLogSInk。
设置生产参数。
一般情况下使用默认值即可,如有需要可以自定义配置。配置示例,请参考配置示例。
配置项 | 是否必选 | 默认值 | 描述 |
---|---|---|---|
endpint | 是 | / | 日志服务的服务地址,例如:tls-cn-beijing.volces.com。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址。 说明 服务地址无需指定 |
region | 是 | / | 日志项目所在地域,例如:cn-beijing。 |
accessKey | 是 | / | 火山引擎账号或 IAM 账号的 Access Key ID。您可以在火山引擎控制台密钥管理页面,获取 Access Key ID。 |
AccessSecret | 是 | / | 火山引擎主账号或 IAM 用户的 Secret Access Key。您可以在火山引擎控制台密钥管理页面,获取 Secret Access Key。 |
totalSizeInBytes | 否 | 104857600 | 日志服务最大可用的缓存发送日志内存大小。 |
maxThreadCount | 否 | 50 | 日志服务发送日志最大可用线程数。 |
maxBatchSizeBytes | 否 | 524288 | 单个请求发送的日志大小,最大不超过 10MB。 |
maxBatchCount | 否 | 4096 | 单个请求发送的日志条数,最大不超过 40960。 |
lingerMs | 否 | 2000 | 单个请求的最长等待时间,单位:毫秒。 |
maxBlockMs | 否 | 60000L | 申请可用内存最长等待时间,单位:毫秒 |
retryCount | 否 | 2 | 发送失败可重试的次数,最大不超过 4 次。 |
maxReservedAttempts | 否 | 3 | 可保留的请求记录数,最大不超过 5。 |
multiAccountEnable | 否 | false | 是否开启跨账户写入数据。 |
roleName | 否 | / | 目标账户授权的角色名称,当 |
failStrategy | 否 | log_pass | 失败策略。可选值:
|
重载 LogSerializationFunction 函数,将数据序列化为 FlinkLogGroup。
/** SimpleStringFunction. */ public static class SimpleStringFunction implements LogSerializationFunction<String> { private final String topic; public SimpleStringFunction(String topic) { this.topic = topic; } @Override public FlinkLogGroup serialize(String element) { FlinkLogGroup group = new FlinkLogGroup(); group.setTopic(topic); LogItem log = new LogItem(); log.setTime(System.currentTimeMillis()); log.addContent("content", element); group.getLogs().add(log); return group; } }
模拟数据写入日志服务。
/** 模拟生产消息. */ public static class RandomStringSource implements ParallelSourceFunction<String> { private volatile boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" getLongString(30)); } } @Override public void cancel() { running = false; } public static String getLongString(int length) { String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; Random random = new Random(); StringBuffer sb = new StringBuffer(); for (int i = 0; i < length; i++) { int number = random.nextInt(62); sb.append(str.charAt(number)); } return sb.toString(); } }
您可以参考如下示例完成日志写入。
package org.apache.flink.connectors.tls; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connectors.tls.config.LogConstants; import org.apache.flink.connectors.tls.entry.FlinkLogGroup; import org.apache.flink.connectors.tls.entry.LogSerializationFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import com.volcengine.model.tls.LogItem; import java.util.Random; /** TLS sink sample. */ public class FlinkLogSinkSample { private static final String DEFAULT_TLS_ENDPOINT = "http://tls-cn-guilin-boe.volces.com"; private static final String DEFAULT_REGION = "cn-guilin-boe"; private static final String ACCESS_KEY_ID = ""; private static final String ACCESS_KEY_SECRET = ""; public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); env.enableCheckpointing(5000); DataStream<String> stream = env.addSource(new RandomStringSource()); // 初始化客户端,推荐通过环境变量动态获取火山引擎密钥等身份认证信息,以免 AccessKey // 硬编码引发数据安全风险。详细说明请参考https://www.volcengine.com/docs/6470/1166455 // 使用 STS 时,ak 和 sk 均使用临时密钥,且设置 VOLCENGINE_TOKEN;不使用 STS 时,VOLCENGINE_TOKEN 部分传空 // FlinkLogSink初始化时配置有默认值,可根据实际情况调整 FlinkLogSink<String> flinkLogSink = new FlinkLogSink.Builder<String>( params.get(LogConstants.LOG_ENDPOINT, DEFAULT_TLS_ENDPOINT), params.get(LogConstants.LOG_REGION, DEFAULT_REGION), params.get(LogConstants.LOG_ACCESS_KEY, ACCESS_KEY_ID), params.get(LogConstants.LOG_ACCESS_SECRET, ACCESS_KEY_SECRET)) .setSerializationFunction(new SimpleStringFunction(params.get("TOPIC"))) .build(); stream.addSink(flinkLogSink); env.execute("flink log producer"); } /** 模拟生产消息. */ public static class RandomStringSource implements ParallelSourceFunction<String> { private volatile boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" getLongString(30)); } } @Override public void cancel() { running = false; } public static String getLongString(int length) { String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; Random random = new Random(); StringBuffer sb = new StringBuffer(); for (int i = 0; i < length; i++) { int number = random.nextInt(62); sb.append(str.charAt(number)); } return sb.toString(); } } /** SimpleStringFunction. */ public static class SimpleStringFunction implements LogSerializationFunction<String> { private final String topic; public SimpleStringFunction(String topic) { this.topic = topic; } @Override public FlinkLogGroup serialize(String element) { FlinkLogGroup group = new FlinkLogGroup(); group.setTopic(topic); LogItem log = new LogItem(); log.setTime(System.currentTimeMillis()); log.addContent("content", element); group.getLogs().add(log); return group; } } }
通过 Flink 消费火山引擎日志服务的日志数据时,需要使用 Flink 提供的 flink-connector-kafka 作为 Connctor。
说明
Apache Flink 内置了多个 Kafka client,不同 Flink 发行版之间其使用的客户端版本可能会发生改变。目前日志服务仅支持 0.11.x 及以上的 Kafka client 连接,对应的 Flink 版本为 1.7.x 及以上,如果使用 1.7.x 以下的 Flink,需要手动指定 Kafka client的版本为 0.11.x 及以上。
在项目中添加 flink-connector-kafka 相关的 Maven 依赖。依赖的详细信息如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.16.0</version> </dependency>
Kafka Source 提供了构建类来创建 KafkaSource
的实例。其使用方法和实现细节请参考 Flink。
在构建 KafkaSource 时必须通过以下方法指定基础属性。
方法 | 说明 |
---|---|
setBootstrapServers() | Bootstrap server,应配置为初始连接的集群地址。格式为
|
setTopics() | 要订阅的 Kafka Topic。此处应配置为日志服务的 Kafka 协议消费主题 ID,格式为 |
setGroupId() | 消费者组 ID。 |
setValueOnlyDeserializer() | 用于解析 Kafka 消息的反序列化器(Deserializer),详细信息请参考消息解析。 |
setProperty() | 安全模式、授权模式等设置。应指定以下配置:
|
以下代码片段展示了如何构建 KafkaSource 来消费日志主题 0fdaa6b6-3c9f-424c-8664-fc0d222c****
中的日志数据。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Consuming data from TLS kafka KafkaSource<String> tlsKafkaSource = KafkaSource.<String>builder() //初始连接的集群地址。格式为服务地址:端口号,例如 tls-cn-beijing.ivolces.com:9093,其中: //服务地址为当前地域下日志服务的服务地址。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务地址。 //端口号固定为 9093。 .setBootstrapServers(tls-cn-beijing.ivolces.com:9093) //要消费的topic,格式为out-xxxxxxxxx .setTopics(out-0fdaa6b6-3c9f-424c-8664-fc0d222c****) //消费组配置 .setGroupId(consumeGroupID) .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) //安全模式,tls只支持SASL_SSL .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL") //鉴权模式,tls只支持PLAIN .setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN") //账号密码设置: //username:用户名,应配置为日志服务的日志项目ID //password:密码,应配置为火山引擎账户密钥,格式为 ${access-key-id}#${access-key-secret}。其中: //${access-key-id} 应替换为您的 AccessKey ID。 //${access-key-secret} 应替换为您的 AccessKey Secret。 .setProperty(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule " + "required username=\"" + bd4d5b20-479a-4f33-9099-a44db3be**** + "\" password=\"" + AK****NWE2MTI#WldWa1p****dZd05JKJKHHUI== + "\";") .build(); DataStreamSource<String> data = env.fromSource(tlsKafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source"); // todo 自定义sink,需要用户自己实现自己的sink。 Sink<String> customizedSink = CreateSink(); data.sinkTo(customizedSink); env.execute("Flink Tls Kafka Job");