本文以调用 Client SDK for Java 为例,介绍通过开源 SDK 实现 MQTT 协议下消息收发的操作过程,并提供对应的示例代码。其他语言或框架的 SDK 消息收发过程相似。
RabbitMQ 收发消息的示例代码请查看 RabbitMQ 的官方文档 RabbitMQ SDK 使用样例 和 GitHub 中的常见场景的示例代码工程 rabbitmq-tutorials。
除此之外,消息队列 RabbitMQ版提供示例项目 MQTT 协议的 Java SDK Demo 供您参考。
如果项目使用 Maven 构建,请在 pom.xml
文件中增加如下依赖。
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
如果项目使用 Gradle 构建,请添加如下依赖:
compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
在使用 Client 接入 RabbitMQ 实例进行消息生产和消费时,需在代码中配置如下参数,请确保下列参数在 Client 代码中配置正确。
参数名 | 参数说明 |
---|---|
host | 接入点域名,进入火山引擎 RabbitMQ 控制台实例详情页面,取 MQTT接入点展示的域名部分,例如 rbtmq-xxxxxxx.rabbitmq.ivolces.com。 |
port | 接入点端口,进入火山引擎 RabbitMQ 控制台实例详情页面,取MQTT接入点展示的端口部分,例如8883。 |
username | 接入用户名,请进入火山引擎 RabbitMQ 控制台实例详情页面,通过 Web UI接入点进入 RabbitMQ 集群的Web控制台页面,在 Users 标签页查看。 |
password | 接入用户密码,请进入火山引擎 RabbitMQ 控制台实例详情页面,通过Web UI接入点进入 RabbitMQ 集群的 Web 控制台页面,在 Users 标签页查看。 |
消息生产的示例代码如下:
package org.example; import org.eclipse.paho.client.mqttv3.*; public class RabbitMQMQTTProducer { //发布客户端 public static void main(String[] args) { String url = "ssl://xxxxxx.rabbitmq.volces.com:8883"; // MQTT代理服务器地址,公网连接。 /* String url = "tcp://xxxxxx.rabbitmq.ivolces.com:1883"; // MQTT代理服务器地址,私网连接。 */ String clientId = "publisher-xxx"; // 客户端ID,用户自定义。 String topic = "my-topic"; // 发布的主题。 String message = "Hello, MQTT!"; // 发布的消息内容。 String username = "demo-user"; // MQTT 代理服务器的用户名。 String password = "demo-user-password"; // MQTT 代理服务器的密码。 try { MqttClient client = new MqttClient(url, clientId); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); // 建立连接 client.connect(connOpts); // 创建消息对象 MqttMessage mqttMessage = new MqttMessage(message.getBytes()); mqttMessage.setQos(1); // 设置消息的 QoS (Quality of Service) 等级 // 发布消息 client.publish(topic, mqttMessage); // 断开连接 client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } }
消息消费的示例代码如下:
import org.eclipse.paho.client.mqttv3.*; public class RabbitMQMQTTConsumer { public static void main(String[] args) { String broker = "ssl://xxxxxx.rabbitmq.volces.com:8883"; // MQTT 代理服务器地址,公网地址。 /* String url = "tcp://xxxxxx.rabbitmq.ivolces.com:1883"; // MQTT代理服务器地址,私网地址。*/ String clientId = "consumer-xxx"; // 客户端 ID String topic = "my-topic"; // 订阅的主题 String username = "demo-user"; // MQTT 代理服务器的用户名 String password = "demo-user-password"; // MQTT 代理服务器的密码 try { MqttClient client = new MqttClient(broker, clientId); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); // 设置消息到达回调处理 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { System.out.println("Connection lost: " + cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("Received message from topic: " + topic); System.out.println("Message: " + new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }); // 建立连接 client.connect(connOpts); // 订阅主题 client.subscribe(topic); // 等待消息到达 synchronized (client) { client.wait(); } // 断开连接 client.disconnect(); } catch (MqttException | InterruptedException e) { e.printStackTrace(); } } }