EMQX 入门实战(2)()

  本篇文章为你整理了EMQX 入门实战(2)()的详细内容,包含有 EMQX 入门实战(2),希望能帮助你了解 EMQX 入门实战(2)。

  Eclipse Paho Java Client是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android);本文主要介绍使用如何使用它来操作 EMQX,文中所使用到的软件版本:EMQX 4.2.2、Paho 1.2.5、Java 1.8.0_321。

  1、引入依赖

  

 dependency 

 

   groupId org.eclipse.paho /groupId

   artifactId org.eclipse.paho.client.mqttv3 /artifactId

   version 1.2.5 /version

   /dependency

 

  2、同步方式收发消息

  2.1、发送消息

  

public static void publish() {

 

   try {

   MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence());

   MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

   /*会话清除标识

   * false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。

   * true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。

   mqttConnectOptions.setCleanSession(true);

   mqttConnectOptions.setUserName("admin");

   mqttConnectOptions.setPassword("123456".toCharArray());

   mqttConnectOptions.setKeepAliveInterval(10);

   mqttClient.setCallback(new MqttCallback() {

   @Override

   public void connectionLost(Throwable cause) {

   logger.info("连接断开:{}", cause.getMessage());

   @Override

   public void messageArrived(String topic, MqttMessage message) throws Exception {

   @Override

   public void deliveryComplete(IMqttDeliveryToken token) {

   logger.info("发送完成:{}", token.isComplete());

   mqttClient.connect(mqttConnectOptions);

   for (int i = 0; i i++) {

   MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());

   mqttMessage.setQos(2);

   //是否保留消息,只能保留最新的一份

   mqttMessage.setRetained(false);

   mqttClient.publish(TOPIC_NAME, mqttMessage);

   mqttClient.disconnect();

   mqttClient.close();

   } catch (Exception e) {

   e.printStackTrace();

  }

 

  2.2、接受消息

  

public static void subscribe(String clientId) {

 

   try {

   MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence());

   MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

   //设为 false, 该订阅将被视为持久订阅

   mqttConnectOptions.setCleanSession(true);

   mqttConnectOptions.setUserName("admin");

   mqttConnectOptions.setPassword("123456".toCharArray());

   mqttClient.setCallback(new MqttCallback() {

   @Override

   public void connectionLost(Throwable cause) {

   logger.info("连接断开:{}", cause.getMessage());

   @Override

   public void messageArrived(String topic, MqttMessage message) throws Exception {

   logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());

   @Override

   public void deliveryComplete(IMqttDeliveryToken token) {

   mqttClient.connect(mqttConnectOptions);

   mqttClient.subscribe(TOPIC_NAME);

   } catch (Exception e) {

   e.printStackTrace();

  }

 

  2.3、完整例子

  

package com.abc.demo.emqx;

 

  import org.eclipse.paho.client.mqttv3.*;

  import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

  import org.slf4j.Logger;

  import org.slf4j.LoggerFactory;

  public class MqttClientCase {

   private static Logger logger = LoggerFactory.getLogger(MqttClientCase.class.getName());

   private static final String SERVER_URI = "tcp://10.49.196.10:1883";

   private static final String TOPIC_NAME = "test-topic";

   public static void main(String[] args) throws Exception {

   new Thread(() - subscribe("client-subscribe-A")).start();

   new Thread(() - subscribe("client-subscribe-B")).start();

   Thread.sleep(1000);

   new Thread(() - publish()).start();

   Thread.sleep(1000 * 60);

   public static void publish() {

   try {

   MqttClient mqttClient = new MqttClient(SERVER_URI, "client-publish", new MemoryPersistence());

   MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

   /*会话清除标识

   * false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。

   * true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。

   mqttConnectOptions.setCleanSession(true);

   mqttConnectOptions.setUserName("admin");

   mqttConnectOptions.setPassword("123456".toCharArray());

   mqttConnectOptions.setKeepAliveInterval(10);

   mqttClient.setCallback(new MqttCallback() {

   @Override

   public void connectionLost(Throwable cause) {

   logger.info("连接断开:{}", cause.getMessage());

   @Override

   public void messageArrived(String topic, MqttMessage message) throws Exception {

   @Override

   public void deliveryComplete(IMqttDeliveryToken token) {

   logger.info("发送完成:{}", token.isComplete());

   mqttClient.connect(mqttConnectOptions);

   for (int i = 0; i i++) {

   MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());

   mqttMessage.setQos(2);

   //是否保留消息,只能保留最新的一份

   mqttMessage.setRetained(false);

   mqttClient.publish(TOPIC_NAME, mqttMessage);

   mqttClient.disconnect();

   mqttClient.close();

   } catch (Exception e) {

   e.printStackTrace();

   public static void subscribe(String clientId) {

   try {

   MqttClient mqttClient = new MqttClient(SERVER_URI, clientId, new MemoryPersistence());

   MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

   //设为 false, 该订阅将被视为持久订阅

   mqttConnectOptions.setCleanSession(true);

   mqttConnectOptions.setUserName("admin");

   mqttConnectOptions.setPassword("123456".toCharArray());

   mqttClient.setCallback(new MqttCallback() {

   @Override

   public void connectionLost(Throwable cause) {

   logger.info("连接断开:{}", cause.getMessage());

   @Override

   public void messageArrived(String topic, MqttMessage message) throws Exception {

   logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());

   @Override

   public void deliveryComplete(IMqttDeliveryToken token) {

   mqttClient.connect(mqttConnectOptions);

   mqttClient.subscribe(TOPIC_NAME);

   } catch (Exception e) {

   e.printStackTrace();

  }

 

  MqttClientCase.java

  3、异步方式收发消息

  3.1、发送消息

  

public static void publish() {

 

   try {

   MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "client-publish", new MemoryPersistence());

   MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

   /*会话清除标识

   * false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。

   * true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。

   mqttConnectOptions.setCleanSession(true);

   mqttConnectOptions.setUserName("admin");

   mqttConnectOptions.setPassword("123456".toCharArray());

   mqttConnectOptions.setKeepAliveInterval(10);

   mqttClient.setCallback(new MqttCallback() {

   @Override

   public void connectionLost(Throwable cause) {

   logger.info("连接断开:{}", cause.getMessage());

   @Override

   public void messageArrived(String topic, MqttMessage message) throws Exception {

   @Override

   public void deliveryComplete(IMqttDeliveryToken token) {

   logger.info("发送完成:{}", token.isComplete());

   IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);

   iMqttToken.waitForCompletion();

   for (int i = 0; i i++) {

   MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());

   mqttMessage.setQos(2);

   //是否保留消息,只能保留最新的一份

   mqttMessage.setRetained(false);

   IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage);

   iMqttDeliveryToken.waitForCompletion();

   mqttClient.disconnect().waitForCompletion();

   mqttClient.close();

   } catch (Exception e) {

   e.printStackTrace();

  }

 

  3.2、接受消息

  

public static void subscribe(String clientId) {

 

   try {

   MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence());

   MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

   //设为 false, 该订阅将被视为持久订阅

   mqttConnectOptions.setCleanSession(true);

   mqttConnectOptions.setUserName("admin");

   mqttConnectOptions.setPassword("123456".toCharArray());

   mqttClient.setCallback(new MqttCallback() {

   @Override

   public void connectionLost(Throwable cause) {

   logger.info("连接断开:{}", cause.getMessage());

   @Override

   public void messageArrived(String topic, MqttMessage message) throws Exception {

   logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());

   @Override

   public void deliveryComplete(IMqttDeliveryToken token) {

   IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);

   iMqttToken.waitForCompletion();

   mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion();

   } catch (Exception e) {

   e.printStackTrace();

  }

 

  3.2、完整例子

  

package com.abc.demo.emqx;

 

  import org.eclipse.paho.client.mqttv3.*;

  import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

  import org.slf4j.Logger;

  import org.slf4j.LoggerFactory;

  public class MqttAsyncClientCase {

   private static Logger logger = LoggerFactory.getLogger(MqttAsyncClientCase.class.getName());

   private static final String SERVER_URI = "tcp://10.49.196.10:1883";

   private static final String TOPIC_NAME = "test-topic";

   public static void main(String[] args) throws Exception {

   new Thread(() - subscribe("client-subscribe-A")).start();

   new Thread(() - subscribe("client-subscribe-B")).start();

   Thread.sleep(1000);

   new Thread(() - publish()).start();

   Thread.sleep(1000 * 60);

   public static void publish() {

   try {

   MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, "test-client-publish", new MemoryPersistence());

   MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

   /*会话清除标识

   * false:服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信。如果不存在这样的会话,服务器必须创建一个新会话。客户端和服务器在断开连接后必须存储会话的状态。

   * true:客户端和服务器必须丢弃任何先前的会话并创建一个新的会话。该会话的生命周期将和网络连接保持一致,其会话状态一定不能被之后的任何会话重用。

   mqttConnectOptions.setCleanSession(true);

   mqttConnectOptions.setUserName("admin");

   mqttConnectOptions.setPassword("123456".toCharArray());

   mqttConnectOptions.setKeepAliveInterval(10);

   mqttClient.setCallback(new MqttCallback() {

   @Override

   public void connectionLost(Throwable cause) {

   logger.info("连接断开:{}", cause.getMessage());

   @Override

   public void messageArrived(String topic, MqttMessage message) throws Exception {

   @Override

   public void deliveryComplete(IMqttDeliveryToken token) {

   logger.info("发送完成:{}", token.isComplete());

   IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);

   iMqttToken.waitForCompletion();

   for (int i = 0; i i++) {

   MqttMessage mqttMessage = new MqttMessage(("测试消息" + (i + 1)).getBytes());

   mqttMessage.setQos(2);

   //是否保留消息,只能保留最新的一份

   mqttMessage.setRetained(false);

   IMqttDeliveryToken iMqttDeliveryToken = mqttClient.publish(TOPIC_NAME, mqttMessage);

   iMqttDeliveryToken.waitForCompletion();

   mqttClient.disconnect().waitForCompletion();

   mqttClient.close();

   } catch (Exception e) {

   e.printStackTrace();

   public static void subscribe(String clientId) {

   try {

   MqttAsyncClient mqttClient = new MqttAsyncClient(SERVER_URI, clientId, new MemoryPersistence());

   MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

   //设为 false, 该订阅将被视为持久订阅

   mqttConnectOptions.setCleanSession(true);

   mqttConnectOptions.setUserName("admin");

   mqttConnectOptions.setPassword("123456".toCharArray());

   mqttClient.setCallback(new MqttCallback() {

   @Override

   public void connectionLost(Throwable cause) {

   logger.info("连接断开:{}", cause.getMessage());

   @Override

   public void messageArrived(String topic, MqttMessage message) throws Exception {

   logger.info("接受到消息:clientId={},topic={},message={},isRetained={}", clientId, topic, message.toString(), message.isRetained());

   @Override

   public void deliveryComplete(IMqttDeliveryToken token) {

   IMqttToken iMqttToken = mqttClient.connect(mqttConnectOptions);

   iMqttToken.waitForCompletion();

   mqttClient.subscribe(TOPIC_NAME, 2).waitForCompletion();

   } catch (Exception e) {

   e.printStackTrace();

  }

 

  MqttAsyncClientCase.java

  

  以上就是EMQX 入门实战(2)()的详细内容,想要了解更多 EMQX 入门实战(2)的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: