You need to enable JavaScript to run this app.
导航
Java SDK(MQTT 协议)
最近更新时间:2024.01.26 16:09:57首次发布时间:2023.11.10 16:08:12

本文以调用 Client SDK for Java 为例,介绍通过开源 SDK 实现 MQTT 协议下消息收发的操作过程,并提供对应的示例代码。其他语言或框架的 SDK 消息收发过程相似。

注意事项

  • 本文档以 MQTT 协议下收发消息为例,演示消息生产与消费的示例代码。请根据业务的协议类型,在控制台中复制 MQTT 接入点进行相关的参数配置。
  • 在消息收发过程中,请使用长连接,避免在每条消息的收发过程中创建连接,否则会造成大量不必要的资源消耗,甚至引起服务端 SYN Flood 防护。

示例项目

RabbitMQ 收发消息的示例代码请查看 RabbitMQ 的官方文档 RabbitMQ SDK 使用样例GitHub 中的常见场景的示例代码工程 rabbitmq-tutorials
除此之外,消息队列 RabbitMQ版提供示例项目 MQTT 协议的 Java SDK Demo 供您参考。

环境准备

  1. 在客户端环境安装 JDK8 或更高版本的 JDK。
  2. 添加依赖。
    • 如果项目使用 Maven 构建,请在 pom.xml文件中增加如下依赖。

      <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
      </dependency>
      
    • 如果项目使用 Gradle 构建,请添加如下依赖:

      compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
      

接入配置

在使用 Client 接入 RabbitMQ 实例进行消息生产和消费时,需在代码中配置如下参数,请确保下列参数在 Client 代码中配置正确。

参数名

参数说明

host

接入点域名,进入火山引擎 RabbitMQ 控制台实例详情页面,取 MQTT接入点展示的域名部分,例如 rbtmq-xxxxxxx.rabbitmq.ivolces.com。

port

接入点端口,进入火山引擎 RabbitMQ 控制台实例详情页面,取MQTT接入点展示的端口部分,例如8883。

username

接入用户名,请进入火山引擎 RabbitMQ 控制台实例详情页面,通过 Web UI接入点进入 RabbitMQ 集群的Web控制台页面,在 Users 标签页查看。
请确保使用的 User 已经绑定了待访问 vhost,具体操作请参见绑定用户和 Vhost

password

接入用户密码,请进入火山引擎 RabbitMQ 控制台实例详情页面,通过Web UI接入点进入 RabbitMQ 集群的 Web 控制台页面,在 Users 标签页查看。

消息生产

消息生产的示例代码如下:

package org.example;
import org.eclipse.paho.client.mqttv3.*;


public class RabbitMQMQTTProducer {
    //发布客户端
    public static void main(String[] args) {
        String url = "ssl://xxxxxx.rabbitmq.volces.com:8883"; // MQTT代理服务器地址,公网连接。
        /* String url = "tcp://xxxxxx.rabbitmq.ivolces.com:1883"; // MQTT代理服务器地址,私网连接。 */
        String clientId = "publisher-xxx"; // 客户端ID,用户自定义。
        String topic = "my-topic"; // 发布的主题。
        String message = "Hello, MQTT!"; // 发布的消息内容。
        String username = "demo-user"; // MQTT 代理服务器的用户名。
        String password = "demo-user-password"; // MQTT 代理服务器的密码。

        try {
            MqttClient client = new MqttClient(url, clientId);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            connOpts.setUserName(username);
            connOpts.setPassword(password.toCharArray());

            // 建立连接
            client.connect(connOpts);
            // 创建消息对象
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setQos(1); // 设置消息的 QoS (Quality of Service) 等级

            // 发布消息
            client.publish(topic, mqttMessage);
            // 断开连接
            client.disconnect();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

消息消费

消息消费的示例代码如下:

import org.eclipse.paho.client.mqttv3.*;

public class RabbitMQMQTTConsumer {
    public static void main(String[] args) {
        String broker = "ssl://xxxxxx.rabbitmq.volces.com:8883"; // MQTT 代理服务器地址,公网地址。
        /* String url = "tcp://xxxxxx.rabbitmq.ivolces.com:1883"; // MQTT代理服务器地址,私网地址。*/
        String clientId = "consumer-xxx"; // 客户端 ID
        String topic = "my-topic"; // 订阅的主题
        String username = "demo-user"; // MQTT 代理服务器的用户名
        String password = "demo-user-password"; // MQTT 代理服务器的密码

        try {
            MqttClient client = new MqttClient(broker, clientId);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            connOpts.setUserName(username);
            connOpts.setPassword(password.toCharArray());

            // 设置消息到达回调处理
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    System.out.println("Connection lost: " + cause.getMessage());
                }

                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    System.out.println("Received message from topic: " + topic);
                    System.out.println("Message: " + new String(message.getPayload()));
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                }
            });

            // 建立连接
            client.connect(connOpts);

            // 订阅主题
            client.subscribe(topic);

            // 等待消息到达
            synchronized (client) {
                client.wait();
            }

            // 断开连接
            client.disconnect();
        } catch (MqttException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}