本篇文章为你整理了EMQX 入门实战(2)()的详细内容,包含有 EMQX 入门实战(2),希望能帮助你了解 EMQX 入门实战(2)。
Eclipse Paho Java Client是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android);本文主要介绍使用如何使用它来操作 EMQX,文中所使用到的软件版本:EMQX 4.2.2、Paho 1.2.5、Java 1.8.0_321。
1、引入依赖
dependency
groupId org.eclipse.paho /groupId
artifactId org.eclipse.paho.client.mqttv3 /artifactId
version 1.2.5 /version
/dependency
2、同步方式收发消息
2.1、发送消息
public static void publish() {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
mqttClient.connect(mqttConnectOptions);
for (int i = 0; i i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
mqttClient.publish(TOPIC_NAME, mqttMessage);
mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
2.2、接受消息
public static void subscribe(String clientId) {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
mqttClient.connect(mqttConnectOptions);
mqttClient.subscribe(TOPIC_NAME);
} catch (Exception e) {
e.printStackTrace();
}
2.3、完整例子
package com.abc.demo.emqx;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttClientCase {
private static Logger logger = LoggerFactory.getLogger(MqttClientCase.class.getName());
private static final String SERVER_URI = "tcp://10.49.196.10:1883";
private static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) throws Exception {
new Thread(() - subscribe("client-subscribe-A")).start();
new Thread(() - subscribe("client-subscribe-B")).start();
Thread.sleep(1000);
new Thread(() - publish()).start();
Thread.sleep(1000 * 60);
public static void publish() {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
mqttClient.connect(mqttConnectOptions);
for (int i = 0; i i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
mqttClient.publish(TOPIC_NAME, mqttMessage);
mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
public static void subscribe(String clientId) {
try {
MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
mqttClient.connect(mqttConnectOptions);
mqttClient.subscribe(TOPIC_NAME);
} catch (Exception e) {
e.printStackTrace();
}
MqttClientCase.java
3、异步方式收发消息
3.1、发送消息
public static void publish() {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
for (int i = 0; i i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage);
iMqttDeliveryToken.waitForCompletion();
mqttClient.disconnect().waitForCompletion();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
}
3.2、接受消息
public static void subscribe(String clientId) {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
3.2、完整例子
package com.abc.demo.emqx;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MqttAsyncClientCase {
private static Logger logger = LoggerFactory.getLogger(MqttAsyncClientCase.class.getName());
private static final String SERVER_URI = "tcp://10.49.196.10:1883";
private static final String TOPIC_NAME = "test-topic";
public static void main(String[] args) throws Exception {
new Thread(() - subscribe("client-subscribe-A")).start();
new Thread(() - subscribe("client-subscribe-B")).start();
Thread.sleep(1000);
new Thread(() - publish()).start();
Thread.sleep(1000 * 60);
public static void publish() {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "test-client-publish", new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/*会话清除标识
* false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。
* true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttConnectOptions.setKeepAliveInterval(10);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("发送完成:{}", token.isComplete());
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
for (int i = 0; i i++) {
MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());
mqttMessage.setQos(2);
//是否保留消息,只能保留最新的一份
mqttMessage.setRetained(false);
IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage);
iMqttDeliveryToken.waitForCompletion();
mqttClient.disconnect().waitForCompletion();
mqttClient.close();
} catch (Exception e) {
e.printStackTrace();
public static void subscribe(String clientId) {
try {
MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//设为 false, 该订阅将被视为持久订阅
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setUserName("admin");
mqttConnectOptions.setPassword("123456".toCharArray());
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开:{}", cause.getMessage());
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);
iMqttToken.waitForCompletion();
mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
MqttAsyncClientCase.java
以上就是EMQX 入门实战(2)()的详细内容,想要了解更多 EMQX 入门实战(2)的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。