本文以调用 Java SDK 为例,介绍如何通过 SDK 接入 BMQ 实例并收发消息。
在 pom.xml
文件中添加以下依赖。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> </dependency>
编写并运行BmqProducerDemo.java发送消息。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //测试消息内容 String value = "this is test message value."; //发送消息条数 int count = 100; Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(properties); try { for (int i = 0; i < count; i++) { RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, value + i++)) .get(5, TimeUnit.SECONDS); logger.info("recordMetadata topic={}, partition={}, offset={}, count = {}.", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), i); } } catch (Throwable e) { logger.error("produce error", e); } producer.flush(); producer.close();
编写并运行BmqConsumerDemo.java消费消息。
//在控制台查看对应接入点信息 String server = "xxx."; //在控制台申请的消息所属Topic String topic = "this is your topic."; //在控制台申请消费消息的consumerGroup String group = "this is your group."; //消费offset策略:earliest, latest, none String offsetReset = "earliest"; Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); properties.put(ConsumerConfig.GROUP_ID_CONFIG, group); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); List<String> topicList = Lists.newArrayList(topic); consumer.subscribe(topicList); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { logger.info("consumed record, topic={}, partition={}, offset={}, key={}, value={}", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } catch (Exception e) { logger.error("consume error", e); } }