You need to enable JavaScript to run this app.
导航
TLS Kafka
最近更新时间:2025.03.03 09:41:51首次发布时间:2025.03.03 09:41:51
我的收藏
有用
有用
无用
无用

TLS 日志服务提供 Kafka 协议消费功能,可以将一个日志主题当作一个 Kafka Topic 来消费,每条日志对应一条 Kafka 消息。您可以使用 Flink kafka 连接器连接日志服务,通过 Flink 任务将日志服务中采集的日志数据消费到下游的大数据组件或者数据仓库。

前置条件

  1. 已经控制台开通 TLS 日志服务和流式计算 Flink 版服务。
  2. 利用 Kafka 协议消费日志需要确保 TLS 控制台已经开通 Kafka 协议消费功能。并且需要保证 TLS 和 Flink 属于同一个 Region 使用私网进行连接。
  3. Flink 当前版本建议使用 Flink-1.16-volcano 及以上版本。

DDL 定义

用作数据源(Source)

CREATE TABLE kafka_source (
   user_id BIGINT,
   item_id BIGINT,
   behavior STRING
) WITH (
     'connector' = 'kafka',
     -- 消费 topic 需要加前缀 out-
     'topic' = 'out-${TLS_TOPIC_ID}',
     -- 参考 TLS 文档站通过私网域名进行访问,9093 端口
     'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9093',
     -- 配置安全协议为 SASL_SSL,验证机制为 PLAIN
     'properties.security.protocol' = 'SASL_SSL',
     'properties.sasl.mechanism' = 'PLAIN',
     -- 配置 JAAS。用户名密码分别填写 TLS 项目 ID 和 AK 、SK 内容
     'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${TLS_PROJECT_ID}" password="${ACCESS_KEY}#${SECRET_KEY}";',
     -- 指定消费者组,和消费模式以及解析格式
     'properties.group.id' = 'test_topic_01',
     'scan.startup.mode' = 'latest-offset',
     'format' = 'json'
 );

用作数据目的(Sink)

CREATE TABLE kafka_sink (
   user_id BIGINT,
   item_id BIGINT,
   behavior STRING
 ) WITH (
   'connector' = 'kafka',
   -- 结合具体地域选择相应的私网域名 9094 端口
   'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9094',
   --关闭幂等。
   'properties.enable.idempotence' = 'false',   
   -- 配置安全协议为 SASL_SSL,机制为 PLAIN
   'properties.security.protocol' = 'SASL_SSL',
   'properties.sasl.mechanism' = 'PLAIN',
   -- 配置 JAAS。用户名密码分别填写项目id AK 、SK 内容
   'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${TLS_PROJECT_ID}" password="${ACCESS_KEY}#${SECRET_KEY}";',
   -- 注意写入的时候 topic 为 TLS 的 Topic id
   'topic' = '${TLS_TOPIC_ID}',
   'format' = 'json'
 );

WITH 参数

通用和消费者参数

参数

是否必选

默认值

数据类型

描述

connector

(none)

String

指定使用的连接器,TLS 当前支持 Kafka Connector 上传、消费 数据。

注意

Kafka-0.10 和 Kafka-0.11 两个版本的连接器使用的 Kafka 客户端有缺陷,在某些情况下可能无法自动提交 Kafka offset 信息。

topic

(none)

String

指定 TLS 日志 Topic ID。注意:

  1. 消费的时候需要采用 'topic' = 'out-${TLS_TOPIC_ID}'格式,需要在 Topic ID 迁增加 out-前缀
  2. 写入的时候需要采用 'topic' = '${TLS_TOPIC_ID}'格式,不需要增加 out- 前缀

properties.bootstrap.servers

(none)

String

指定 TLS 的访问地址,可以参考 TLS 服务地址。注意:

  1. 生产者:一般是 9094 端口服务
  2. 消费者:需要控制台开启 Kafka 协议消费,一般是 9093 端口服务

properties.group.id

(none)

String

指定 TLS 消费组的 ID。

format

(none)

String

用来反序列化 TLS 返回消息体(value)时使用的格式。常见的日志格式如下:

  • JSON
  • CSV
  • RAW

scan.startup.mode

group-offsets

String

读取数据时的启动模式。 取值如下:

  • latest-offset:从 Kafka 最早分区开始读取。
  • latest-offset:从 Kafka 最新位点开始读取。
  • group-offsets:默认值,根据 Group 读取。
  • timestamp:从 Kafka 指定时间点读取。需要在 WITH 参数中指定 scan.startup.timestamp-millis 参数。
  • specific-offsets:从 Kafka 指定分区目标偏移量读取。需要在 WITH 参数中指定 scan.startup.specific-offsets 参数。

scan.startup.specific-offsets

(none)

String

specific-offsets 启动模式下,指定每个分区的启动偏移量。如partition:0,offset:42;partition:1,offset:300

scan.startup.timestamp-millis

(none)

Long

timestamp 启动模式下,指定启动位点时间戳,单位毫秒。

scan.parallelism

(none)

Integer

单独设置 Source 并发。如果不设置,则并行度为作业的默认并发数。
该参数经常用于 Source 和下游算子需要断开算子链的场景,使得下游重计算的算子能使用较大的默认并发,提高计算能力,同时保持 Source 并发和 Kafka 分区数相等,此时 Source 到下游由于并发不同,数据 Shuffle 是均匀的,从而提高了整体计算速率。

生产者参数

另外作为生产者参数,TLS Kafka 支持如下参数类型:

参数

是否必选

默认值

数据类型

描述

properties.enable.idempotence

true

Boolean

如果是写入 TLS 服务,需要关闭幂等。设置为 false 。

注意

如果您通过 Kafka 连接器写入 TLS 日志服务,那么必须将properties.enable.idempotence参数设置为 false 以关闭幂等,否则任务会运行失败。

properties.compression.type

(none)

string

用于指定消息在发送到 Kafka 之前的压缩算法。推荐使用 lz4 算法。

properties.batch.size

16

string

单个 Partition 对应的 Batch 中支持写入的最大字节数,默认值为 16 KB。

  • batch.size=单个 Producer 的消息QPS * 消息大小 * liner.ms/ Partition 数
  • 提升 batch.size 的值,一个 Batch 能写入更多数据,可以提升吞吐量。但是 batch.size 也不能设置太大,以免出现 Batch 迟迟写不满,导致发送消息延迟高。
  • 一般与 properties.linger.msproperties.buffer.memory 参数联合使用,满足任意一个条件都会立即发送消息。

推荐值

  • 设置了压缩:262144
  • 未设置压缩:2621440

properties.linger.ms

0

string

消息在 Batch 中的停留时间,即发送消息前的等待时长。默认为 0 毫秒,表示“立即发送消息”。

  • 可以适当提升 linger.ms 取值,以引入小延迟为代价,提高吞吐量和压缩率。推荐值 100 ms。
  • 该参数一般与 properties.batch.sizeproperties.buffer.memory 参数联合使用,满足任意一个条件都会立即发送消息。

properties.max.request.size

(none)

Integer

用于指定单个请求(一个请求中可能有多个分区)的最大大小。单位:字节。
推荐值

  • 设置了压缩:1046576
  • 未设置压缩:10465760

properties.buffer.memory

32M

string

缓存消息的总可用 Memory 空间,如果 Memory 用完,则会立即发送缓存中的消息。

  • 设置时,建议按照计算公式设置:buffer.memory>=batch.size * partition数*2。
  • 该参数一般与 properties.batch.sizeproperties.linger.ms 参数联合使用,满足任意一个条件都会立即发送消息。

说明

如果 buffer.memory 较小,可能会造成 Batch 失效,从而导致 QPS 升高被下游限流等问题。

sink.partitioner

fixed

String

Flink 分区到 TLS 分区的映射关系。取值如下:

  • fixed(默认值):每个 Flink 分区对应一个 TLS 分区。
  • round-robin:Flink 分区中的数据将被轮流分配至 Kafka 的各个分区。
  • 自定义映射模式:支持创建一个 FlinkKafkaPartitioner 的子类来自定义分区映射模式。例如org.mycompany.MyPartitioner

sink.parallelism

(none)

Integer

单独设置 TLS Sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

SQL 示例代码

源表示例

以下是 TLS 作为源表的消费方式:

CREATE TABLE tls_source (
   user_id BIGINT,
   item_id BIGINT,
   behavior STRING
) WITH (
   'connector' = 'kafka',
   'topic' = 'out-${TLS_TOPIC_ID}',
   'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9093',
   'properties.security.protocol' = 'SASL_SSL',
   'properties.sasl.mechanism' = 'PLAIN',
   'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${TLS_PROJECT_ID}" password="${ACCESS_KEY}#${SECRET_KEY}";',
   'properties.group.id' = 'test_topic_01',
   'scan.startup.mode' = 'latest-offset',
   'format' = 'json'
 );

CREATE TABLE print_sink (
   user_id BIGINT,
   item_id BIGINT,
   behavior STRING
) WITH (
   'connector' = 'print'           
);
insert into print_sink
select * from tls_source;

生产者示例

以下是 TLS 作为生产者的结果表示例代码:

CREATE TABLE datagen_source (
   user_id BIGINT,
   item_id BIGINT,
   behavior STRING
) WITH (
   'connector' = 'datagen',
   'rows-per-second' = '5'          
);

CREATE TABLE tls_sink (
   user_id BIGINT,
   item_id BIGINT,
   behavior STRING
 ) WITH (
   'connector' = 'kafka',
   'properties.bootstrap.servers' = 'tls-cn-beijing.ivolces.com:9094',
   'properties.enable.idempotence' = 'false',   
   'properties.security.protocol' = 'SASL_SSL',
   'properties.sasl.mechanism' = 'PLAIN',
   'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="${TLS_PROJECT_ID}" password="${ACCESS_KEY}#${SECRET_KEY}";',
   'topic' = '${TLS_TOPIC_ID}',
   'format' = 'json'
 );

insert into tls_sink
select * from datagen_source;

JAVA 示例代码

POM 文件依赖,其中 Flink 版本当前支持 1.11、1.16 和 1.17 等选项。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>

消费者示例

TLS 消费代码示例:

package com.example;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class KafkaConsumerJob {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule" +
                " required username=\"${TLS_PROJECT_ID}\"" +
                " password=\"${ACCESS_KEY}#${SECRET_KEY}\";";

        // 配置 Kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
                // 设置消费
                .setBootstrapServers("tls-cn-beijing.ivolces.com:9093")
                .setTopics("out-${TLS_TOPIC_ID}")
                .setGroupId("flink-consumer-group")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("sasl.mechanism", "PLAIN")
                .setProperty("sasl.jaas.config", jaas)
                .build();

        // 添加 source 并处理数据
        DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        // 添加数据处理逻辑
        stream.map(value -> {
            System.out.println("Received message: " + value);
            return value;
        });
        // 执行任务
        env.execute("Kafka Consumer Job");
    }
}

生产者示例

TLS 写入代码示例:

package com.example;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;

import java.time.LocalDateTime;
import java.util.Properties;

public class KafkaProducerJob {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建数据源
        DataStream<String> sourceStream = env.addSource(new CustomDataSource());
        // 认证信息中注意填写 project-id、AK、SK
        String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule" +
                " required username=\"${project-id}\"" +
                " password=\"${ACCESS_KEY}#${SECRET_KEY}\";";

        // 配置 Kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
                // 结合具体地域修改私网访问地址
                .setBootstrapServers("tls-cn-beijing.ivolces.com:9094")
                // 注意配置 topic-id 
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("${topic-d}")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                // 配置认证、关闭幂等特性等配置
                .setProperty("security.protocol", "SASL_SSL")
                .setProperty("enable.idempotence", "false")
                .setProperty("sasl.mechanism", "PLAIN")
                .setProperty("sasl.jaas.config", jaas)
                .build();
        // 将数据写入 Kafka
        sourceStream.sinkTo(sink);
        // 执行任务
        env.execute("Kafka Producer Job");
    }

    // 自定义数据源
    public static class CustomDataSource implements SourceFunction<String> {
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            while (isRunning) {
                // 模拟 json 数据写入
                String message = "{\"a\": 1, \"b\": 2}";
                ctx.collect(message);
                Thread.sleep(500); // 每秒生成一条数据
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

FAQ

丢状态重启

如果修改了作业的拓扑结构,新增了一些算子,则不能从历史状态恢复,此时可以丢状态,并从 timestamp 或者 group-offsets 模式启动:

  • 从 timestamp 时间戳恢复

适用于之前的作业没开过 checkpoint。需要从数据曲线-业务延迟监控里,找到当前消费延迟,反推出当前消费的时间点,从这个时间点恢复即可。
比如业务延迟是 1h,当前时刻是 12 点,则反推出已消费时间点是 11 点,为了避免丢数,则可以根据情况往前多消费一点,比如往前 30min,那么要设置的恢复的时刻就是 10:30。
参数如下:

'properties.scan.startup.mode' = 'timestamp',
'properties.scan.startup.timestamp-millis' = 'xxxx' -- 毫秒时间戳,可以从 https://tool.chinaz.com/tools/unixtime.aspx 获取

  • 从 group offsets 恢复

适用于之前的作业开过 checkpoint,并完成过 checkpoint。由于 checkpoint 时,kafka source 会向 kafka 实例提交 offset,因此 offset 会保存一份到 kafka 实例。
此时需要把 kafka source 里的 startup mode 改为 group-offsets,参数如下:

'properties.scan.startup.mode' = 'group-offsets'

注意,如果这个消费者组之前没有 offset 信息,第一次启动的时候会报错无法获取 partition 的 offset 信息,此时需要设置如下 kafka 参数:

'properties.auto.offset.reset' = 'earliest'

此参数意义为如果没有 group-offset 则从最老的位置进行消费

自动提交 Offsets 机制

目前一般使用以下两种方式自动提交 Kafka Offsets。

  • 方式 1:依赖 Flink 任务 Checkpoint。
    Flink 任务开启 Checkpoint 时,Kafka Source 在完成 Checkpoint 时会提交当前的消费位点,以保证 Flink 的 Checkpoint 状态和 Kafka Broker 上的提交位点一致。

    注意

    依赖 Flink 任务 Checkpoint 来管理 Kafka Offsets 时,如果上游数据量很大,很可能会触发上游的 LAG 告警。

  • 方式 2:依赖 Kafka Consumer 的位点定时提交逻辑。
    当 Flink 任务没有开启 Checkpoint 时,Kafka Source 将依赖 Kafka Consumer 的位点定时提交逻辑。您可以通过设置enable.auto.commitauto.commit.interval.ms两个参数来控制位点定时自动提交。
    -- 是否自动提交 Offsets。取值为 true 表示自动提交 Offsets;取值为 false,表示手动同步或异步提交。
      'enable.auto.commit' = 'true',
      -- 自动提交 Offsets 的时间间隔,单位为 ms。
      'auto.commit.interval.ms' = '500',