本文介绍如何通过 Java SDK 接入云原生消息引擎 BMQ 并收发消息。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>
编写并运行BmqProducerDemo.java
发送消息。
使用PLAINTEXT
协议接入点地址连接 BMQ 实例时,无需鉴权。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //测试消息内容 String value = "this is test message value."; //发送消息条数 int count = 100; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); try { for (int i = 0; i < count; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++)) .get(5, TimeUnit.SECONDS); logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), i); } } catch (Throwable e) { logger.error("produce error", e); } producer.flush(); producer.close();
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //测试消息内容 String value = "this is test message value."; //发送消息条数 int count = 100; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //SASL_PLAINTEXT配置 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"用户名\" password=\"密码\";"); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); try { for (int i = 0; i < count; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++)) .get(5, TimeUnit.SECONDS); logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), i); } } catch (Throwable e) { logger.error("produce error", e); } producer.flush(); producer.close();
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //测试消息内容 String value = "this is test message value."; //发送消息条数 int count = 100; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //SASL_SSL配置 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"用户名\" password=\"密码\";"); properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); try { for (int i = 0; i < count; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++)) .get(5, TimeUnit.SECONDS); logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), i); } } catch (Throwable e) { logger.error("produce error", e); } producer.flush(); producer.close();
编写并运行BmqConsumerDemo.java
消费消息。
使用PLAINTEXT
协议接入点地址连接 BMQ 实例时,无需鉴权。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //在控制台申请消费消息的consumerGroup String group = "this is your group."; //消费offset策略:earliest, latest, none String offsetReset = "earliest"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); List<String> topicList = Lists.newArrayList(topic); consumer.subscribe(topicList); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } catch (Exception e) { logger.error("consume error", e); } }
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //在控制台申请消费消息的consumerGroup String group = "this is your group."; //消费offset策略:earliest, latest, none String offsetReset = "earliest"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //SASL_PLAINTEXT配置 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"用户名\" password=\"密码\";"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); List<String> topicList = Lists.newArrayList(topic); consumer.subscribe(topicList); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } catch (Exception e) { logger.error("consume error", e); } }
通过 SASL 用户名和密码进行鉴权。您需要获取有权限用户的名称和密码,如何获取请参见查看 SASL 用户密码。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //在控制台申请消费消息的consumerGroup String group = "this is your group."; //消费offset策略:earliest, latest, none String offsetReset = "earliest"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //设置最大拉取数据,需要小于公网带宽 properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 512000); properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 512000); //SASL_SSL配置 properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"用户名\" password=\"密码\";"); properties.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); List<String> topicList = Lists.newArrayList(topic); consumer.subscribe(topicList); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } catch (Exception e) { logger.error("consume error", e); } }