You need to enable JavaScript to run this app.
导航
默认接入点收发消息
最近更新时间:2024.04.03 15:28:17首次发布时间:2022.01.28 15:16:00

本文以 Java 客户端为例,介绍如何在 VPC 环境下通过默认接入点(PLAINTEXT)接入消息队列 Kafka版,并收发消息。

前提条件

已完成准备工作。详细说明请参考准备工作

1 安装Java依赖库

在 Java 项目的 pom.xml 中添加相关依赖。此处以 Kafka 2.2.2 版本为例。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.2</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>

2 添加配置文件

  1. 创建消息队列 Kafka版配置文件 config.properties
    配置文件字段的详细说明,请参考配置文件
    使用默认接入点时,配置文件示例如下。

    bootstrap.servers=xxxxx
    security.protocol=PLAINTEXT
    topic=my-topic
    consumer.group.id=test
    consumer.auto.offset.reset=earliest
    consumer.enable.auto.commit=false
    client.dns.lookup=use_all_dns_ips
    
  2. 创建配置文件加载程序 KafkaConfigurer.java

    package com.volcengine.openservice.kafka;
    
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.util.Properties;
    
    public class KafkaConfigurer {
    
        private static Properties properties;
    
        public synchronized static Properties getKafkaProperties(String path) {
            if (null != properties) {
                return properties;
            }
            //获取配置文件config.properties的内容
            Properties kafkaProperties = new Properties();
            try {
                FileInputStream conf = new FileInputStream(path);
                kafkaProperties.load(conf);
            } catch (Exception e) {
                //没加载到文件,程序要考虑退出
                e.printStackTrace();
            }
            properties = kafkaProperties;
            return kafkaProperties;
        }
    }
    

3 发送消息

实现方法

  1. 创建发送消息程序 ProducerDemo.java

  2. 编译并运行 ProducerDemo.java 发送消息。

  3. 查看运行结果。
    运行结果示例如下。

    Produce ok:sasl-0@0
    Produce ok:sasl-0@1
    Produce ok:sasl-0@2
    Produce ok:sasl-0@3
    

    说明

    消息队列 Kafka版提供示例 Demo 供您快速接入,下载并解压缩 Demo后,可以直接执行以下命令发送消息。

    java -cp kafka-demo.jar com.volcengine.openservice.kafka.ProducerDemo ./config.properties
    

示例代码

通过默认接入点生产消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/src/main/java/com/volcengine/openservice/kafka/ProducerDemo.java,实现相关业务逻辑。

package com.volcengine.openservice.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

class Producer {
    // 生产者使用的topic
    private static String topic;

    // 生产者使用的配置
    private static Properties props  = new Properties();

    private static KafkaProducer<String, String> producer;

    // 构造生产者
    public Producer(Properties kafkaProperties) {
        setProps(kafkaProperties);
        setSasl(kafkaProperties);
        setTopic(kafkaProperties);
        newProducer();
    }

    // 设置生成的topic, 请在控制台申请之后,填写在这里
    private void setTopic(Properties kafkaProperties) {
        topic  = kafkaProperties.getProperty("topic");
    }

    // 设置安全协议, kafka支持SASL_PLAINTEXT和PLAINTEXT协议
    private void setSasl(Properties kafkaProperties) {
        String protocol = kafkaProperties.getProperty("security.protocol");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol);
        // 如果安全协议为PLAINTEXT,则不需要填充用户名和密码,直接返回
        if (protocol.equals("PLAINTEXT")) {
            return;
        }

        // 如果安全协议为SASL_PLAINTEXT,需要获取加密类型以及sasl的config
        if (protocol.equals("SASL_PLAINTEXT")) {
            props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getProperty("sasl.mechanism"));
            props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaProperties.getProperty("sasl.jaas.config"));
            return;
        }

        throw new IllegalArgumentException("security.protocol is not correct");
    }

    // 设置生产者的启动参数
    private void setProps(Properties kafkaProperties) {
        //设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //Kafka消息的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //请求的最长等待时间
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
        //设置客户端内部重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        //设置客户端内部重试间隔
        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
        // 设置客户端的dns策略
        props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, kafkaProperties.getProperty("client.dns.lookup"));
    }

    // 构造生产者对象,也即生成一个生产实例
    private void newProducer() {
        // 构造Producer对象,注意,该对象是线程安全的,一般来说,一个进程内一个Producer对象即可;
        // 如果想提高性能,可以多构造几个对象,但不要太多,最好不要超过5个
        producer  = new KafkaProducer<String, String>(props);
    }

    public void Produce() {
        //构造一个Kafka消息
        String value = "this is demo message "; //消息的内容

        //批量获取futures可以加快速度,但注意批量不要太大
        List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
        for (int i =0; i < 100; i++) {
            //发送消息,并获得一个Future对象
            ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
            Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
            futures.add(metadataFuture);
        }
        producer.flush();
        for (Future<RecordMetadata> future: futures) {
            //同步获得Future对象的结果
            try {
                RecordMetadata recordMetadata = future.get();
                System.out.println("Produce ok:" + recordMetadata.toString());
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }
}

public class ProducerDemo {

    public static void main(String args[]) {
        if (args.length < 1) {
            System.out.println("please input config path");
            return;
        }
        // 获取生产者对象
        Producer producer = new Producer(KafkaConfigurer.getKafkaProperties(args[0]));

        try {
            // 生产消息
            producer.Produce();
        } catch (Exception e) {
            // 客户端内部重试之后,仍然发送失败,业务要应对此类错误
            System.out.println("error occurred");
            e.printStackTrace();
        }
    }


}

4 消费消息

实现方法

  1. 创建 Consumer 订阅消息程序 ConsumerDemo.java

  2. 编译并运行 ConsumerDemo.java 消费消息。

  3. 查看运行结果。
    运行结果示例如下。

    Consume: ConsumerRecord(topic = sasl, patition = 6, leaderEpoch=0, offset=0, CreateTime = 1637207092476, serialized key size = -1, serialized key size = 24, headers = ReacordHeaders(headers = [], isReadOnly = false), key = null, value = this is demo, message : 0)
    Consume: ConsumerRecord(topic = sasl, patition = 6, leaderEpoch=0, offset=0, CreateTime = 1637207092476, serialized key size = -1, serialized key size = 24, headers = ReacordHeaders(headers = [], isReadOnly = false), key = null, value = this is demo, message : 1)
    

    说明

    消息队列 Kafka版提供示例 Demo供您快速接入,下载并解压缩 Demo 后,可以直接执行以下命令消费消息。

    java -cp kafka-demo.jar com.volcengine.openservice.kafka.ConsumerDemo ./config.properties
    

示例代码

通过默认接入点消费消息的示例代码如下,您也可以参考 Demo 中的示例文件 {DemoPath}/src/main/java/com/volcengine/openservice/kafka/ConsumerDemo.java,实现相关业务逻辑。

package com.volcengine.openservice.kafka;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;

class Consumer {
    // 消费者的启动配置
    private static Properties props  = new Properties();

    private KafkaConsumer<String, String> consumer;

    // 消费者实例构造函数
    public Consumer(Properties kafkaProperties) {
        setProps(kafkaProperties);
        setSasl(kafkaProperties);
        newConsumer();
        subscribed(kafkaProperties);
    }

    // 设置安全协议, kafka支持SASL_PLAINTEXT和PLAINTEXT协议
    private void setSasl(Properties kafkaProperties) {
        String protocol = kafkaProperties.getProperty("security.protocol");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol);
        // 如果安全协议为PLAINTEXT,则不需要填充用户名和密码,直接返回
        if (protocol.equals("PLAINTEXT")) {
            return;
        }

        // 如果安全协议为SASL_PLAINTEXT,需要获取加密类型以及sasl的config
        if (protocol.equals("SASL_PLAINTEXT")) {
            props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getProperty("sasl.mechanism"));
            props.put(SaslConfigs.SASL_JAAS_CONFIG, kafkaProperties.getProperty("sasl.jaas.config"));
            return;
        }

        throw new IllegalArgumentException("security.protocol is not correct");
    }

    // 设置消费者的启动参数
    private void setProps(Properties kafkaProperties) {
        //设置接入点,请通过控制台获取对应Topic的接入点
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));

        //可根据实际拉取数据和客户的版本等设置此值,默认30s
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        //每次poll的最大数量
        //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
        //消息的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //当前消费实例所属的消费组
        //属于同一个组的消费实例,会负载消费消息
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("consumer.group.id"));
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getProperty("consumer.auto.offset.reset"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getProperty("consumer.enable.auto.commit"));
        // 设置客户端的dns策略
        props.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, kafkaProperties.getProperty("client.dns.lookup"));
    }

    // 构造消费者对象,也即生成一个消费实例
    private void newConsumer() {
        consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
    }

    // 订阅topic
    private void subscribed(Properties kafkaProperties) {
        //设置消费组订阅的Topic
        List<String> subscribedTopic =  new ArrayList<String>();
        //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样
        //Topic需要先在控制台进行创建
        subscribedTopic.add(kafkaProperties.getProperty("topic"));
        consumer.subscribe(subscribedTopic);
    }

    // 消费消息
    public void poll() {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        //必须在下次poll之前消费完这些数据, 且总耗时不得超过SESSION_TIMEOUT_MS_CONFIG
        //建议开一个单独的线程池来消费消息,然后异步返回结果
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Consume: %s%n", record.toString());
        }
    }
}

public class ConsumerDemo {
    public static void main(String args[]) {
        if (args.length < 1) {
            System.out.println("please input config path");
            return;
        }

        // 获取消费者对象
        Consumer consumer = new Consumer(KafkaConfigurer.getKafkaProperties(args[0]));

        //循环消费消息
        while (true){
            try {
                // 消费消息
                consumer.poll();
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (Throwable ignore) {

                }
                e.printStackTrace();
            }
        }
    }
}