You need to enable JavaScript to run this app.
导航
Java SDK
最近更新时间:2024.07.09 17:02:42首次发布时间:2022.07.26 15:08:01

本文介绍如何通过 Java SDK 接入云原生消息引擎 BMQ 并收发消息。

前提条件

  • 创建资源实例,并获取接入点地址,请参见管理资源池
  • 您应提前安装 Java 环境,包括安装 1.8 或以上版本 JDK,以及安装 3.5 或以上版本 Maven。具体操作,请参见安装JDK安装Maven
  • 您需要提前在实例所属安全组中放开 9092 端口。具体操作,请参见添加安全组访问规则
  • (可选)您如果需要通过 SASL 用户名和密码进行鉴权,还需提前创建用户并获取密码。具体操作,请参见创建 SASL 用户

安装 Java 依赖库

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

发送消息

编写并运行BmqProducerDemo.java发送消息。

PLAINTEXT

使用PLAINTEXT协议接入点地址连接 BMQ 实例时,无需鉴权。

//在控制台查看对应接入点信息
String server = "xxx.";
//在控制台申请的消息所属Topic
String topic = "this is your topic.";
//测试消息内容
String value = "this is test message value.";
//发送消息条数
int count = 100;
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
try {
    for (int i = 0; i < count; i++) {
        RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++))
                .get(5, TimeUnit.SECONDS);
        logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.",
                recordMetadata.topic(),
                recordMetadata.partition(),
                recordMetadata.offset(),
                i);
    }
} catch (Throwable e) {
    logger.error("produce error", e);
}
producer.flush();
producer.close();

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

//在控制台查看对应接入点信息
String server = "xxx.";
//在控制台申请的消息所属Topic
String topic = "this is your topic.";
//测试消息内容
String value = "this is test message value.";
//发送消息条数
int count = 100;
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//SASL_PLAINTEXT配置
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"用户名\" password=\"密码\";");

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
try {
    for (int i = 0; i < count; i++) {
        RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++))
                .get(5, TimeUnit.SECONDS);
        logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.",
                recordMetadata.topic(),
                recordMetadata.partition(),
                recordMetadata.offset(),
                i);
    }
} catch (Throwable e) {
    logger.error("produce error", e);
}
producer.flush();
producer.close();

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

//在控制台查看对应接入点信息
String server = "xxx.";
//在控制台申请的消息所属Topic
String topic = "this is your topic.";
//测试消息内容
String value = "this is test message value.";
//发送消息条数
int count = 100;
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//SASL_SSL配置
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"用户名\" password=\"密码\";");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
try {
    for (int i = 0; i < count; i++) {
        RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++))
                .get(5, TimeUnit.SECONDS);
        logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.",
                recordMetadata.topic(),
                recordMetadata.partition(),
                recordMetadata.offset(),
                i);
    }
} catch (Throwable e) {
    logger.error("produce error", e);
}
producer.flush();
producer.close();

消费消息

编写并运行BmqConsumerDemo.java消费消息。

PLAINTEXT

使用PLAINTEXT协议接入点地址连接 BMQ 实例时,无需鉴权。

//在控制台查看对应接入点信息
String server = "xxx.";
//在控制台申请的消息所属Topic
String topic = "this is your topic.";
//在控制台申请消费消息的consumerGroup
String group = "this is your group.";
//消费offset策略:earliest, latest, none
String offsetReset = "earliest";
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = Lists.newArrayList(topic);
consumer.subscribe(topicList);
while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
    } catch (Exception e) {
        logger.error("consume error", e);
    }
}

SASL_PLAINTEXT

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

//在控制台查看对应接入点信息
String server = "xxx.";
//在控制台申请的消息所属Topic
String topic = "this is your topic.";
//在控制台申请消费消息的consumerGroup
String group = "this is your group.";
//消费offset策略:earliest, latest, none
String offsetReset = "earliest";
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//SASL_PLAINTEXT配置
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"用户名\" password=\"密码\";");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = Lists.newArrayList(topic);
consumer.subscribe(topicList);
while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
    } catch (Exception e) {
        logger.error("consume error", e);
    }
}

SASL_SSL

通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码

//在控制台查看对应接入点信息
String server = "xxx.";
//在控制台申请的消息所属Topic
String topic = "this is your topic.";
//在控制台申请消费消息的consumerGroup
String group = "this is your group.";
//消费offset策略:earliest, latest, none
String offsetReset = "earliest";
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//设置最大拉取数据,需要小于公网带宽
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 512000);
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512000);

//SASL_SSL配置
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"用户名\" password=\"密码\";");
properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");


KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
List<String> topicList = Lists.newArrayList(topic);
consumer.subscribe(topicList);
while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        for (ConsumerRecord<String, String> record : records) {
            logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}",
                    record.topic(),
                    record.partition(),
                    record.offset(),
                    record.key(),
                    record.value());
        }
    } catch (Exception e) {
        logger.error("consume error", e);
    }
}