java mq消息队列,java接收rabbitmq消息

  java mq消息队列,java接收rabbitmq消息

  

目录

工作队列1。轮询分发消息1.1 抽取工具类1.2 编写两个工作线程1.3 编写生产者1.4 运行测试1.5 异常情况2.消息应答2.1 自动应答2.2 手动应答2.3 消息自动重新入队2.4 手动应答测试2.4.1 生产者代码2.4.2 消费者代码2.4.3 测试总结

 

  

Work Queues

工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

 

  其实就是生产者发送大量的消息,发送到队列之后,由多个消费者(工作线程)来处理消息,并且每个消息只能被处理一次。

  

1. 轮询分发消息

多个工作线程按照次序每来一个消息执行一次。

 

  

1.1 抽取工具类

直接通过信息获取信道

 

  /* * * * @描述兔子q工具类* @ date 2022/3/5 10:02 */public类rabbit mqutils {公共静态通道获取通道()抛出异常{连接工厂factory=新连接工厂();工厂。设置主机(“1”);工厂。设置用户名(“guest”);工厂。设置密码(“guest”);连接连接=工厂。新建连接();返回联系。创建通道();}}

  

1.2 编写两个工作线程

工作2和工作一代码没有区别,只需要对它做出区分即可。

 

  公共类工人1 { //指定队列名称私有静态最终字符串QUEUE _ NAME= work _ queue公共静态void main(String[] args)引发异常{ //获取信道channel channel=rabbitmqutils。获取通道();//声明:接收消息回调deliver callback deliver callback=(消费者标签,消息)- { System.out.println(工作线程01:“新字符串(消息。getbody());};//声明:取消消费回调取消回拨取消回拨=消费者标签-{ system。出去。println(工作线程01取消接收:‘消费者标签);};System.out.println(工作线程01启动完成.);渠道。基本消费(QUEUE _ NAME,false,deliverCallback,cancel callback);}}

  

1.3 编写生产者

公共类生产者{私有静态最终字符串QUEUE _ NAME= work _ queue公共静态void main(String[] args)引发异常{ Channel Channel=rabbitmqutils。获取通道();//产生队列渠道。QUEUE declare(QUEUE _ NAME,false,false,true,null);//消息体

 

   Scanner scanner = new Scanner(System.in); int i = 1; while (scanner.hasNext()){ String msg = scanner.next(); msg = msg + i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("发送成功:" + msg); } System.out.println("----------==========发送完毕==========----------"); }}

 

  

1.4 运行测试

先启动两个工作线程,再启动生产者。

 

  

出现404异常请参考下方1.6

 

  生产者发送情况:

  

 

  轮询状态下两个工作队列接收状态:

  

 

  

 

  

 

  

1.5 异常情况

在先启动两个消费者线程时,会提示404找不到队列

 

  

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue work_queue in vhost /, class-id=60, method-id=20)

发生这个情况的原因很显然是因为先启动了消费者,但是在RabbitMQ中没有创建相对应的队列名称,解决方法可以:

 

  1.先启动生产者创建队列(也可以在RabbitMQ中创建队列);

  2.再启动消费者就不会产生这个错误;

  3.再在生产者中使用Scanner类去发送消息测试。

  

 

  

2. 消息应答

消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以删除消息。其目的就是为了保护消息在被处理之前不会消失。

 

  

 

  

2.1 自动应答

这种方式发送后就被认定为已经传送成功,所以在消息接收到之前消费者的连接或者channel关闭,那么这个消息就会丢失。其特点是消费者可以传递过载的消息,对传递的消息没有限制,但如果因内存耗尽消费者线程被系统杀死,就会使得多条消息丢失。所以这个模式需要在数据安全性和吞吐量之间选择,适合使用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

 

  所以自动应答的方式局限性很高。

  

 

  

2.2 手动应答

优点:可以批量应答和减少网络拥挤。

 

  1.channel.basicAck(long deliveryTag, boolean multiple);:肯应应答,处理完消息之后提醒RabbitMQ可以删除当前队列,deliveryTag:当前队列中选中的消息;multiple:是否批量应答。

  2.channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):否定应答,

  3.channel.basicReject(long deliveryTag, boolean requeue):否定并且拒绝应答。

  

 

  

2.3 消息自动重新入队

如果消费者因为一些原因失去了对RabbitMQ的连接,导致没有发送ACK确认,RabbitMQ就会对该消息进行重新排队,并且分发给可以处理该消息的消费者,所以即使某个消费者死亡,也可以保证消息不会丢失。

 

  

 

  

2.4 手动应答测试

测试目的:在手动应答状态下不会发生消息丢失的情况。

 

  

测试方法:

 

  1.创建两个消费者;

  2.使用工具类使线程睡眠一定时间;

  3.在睡眠时关闭线程,看能否自动重新入队。

  

 

  

2.4.1 生产者代码

/** * @Description 手动应答生产者 * @date 2022/3/5 19:03 */public class Producer1 { // 指定队列名 private static final String TASK_QUEUE_RES = "queue_res"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_RES,false,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; channel.basicPublish("",TASK_QUEUE_RES,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息:" + msg + "成功"); } }}

 

  

2.4.2 消费者代码

/** * @Description 手动应答消费者1 * @date 2022/3/5 19:17 */public class Worker1 { private static final String TASK_QUEUE_RES = "queue_res"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); System.out.println("线程A等待接收......"); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模拟并发沉睡一秒 try { Thread.sleep(1000); System.out.println("线程A接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); /** * basicAck: * 1. 消息标记 * 2. 是否批量 */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; channel.basicConsume(TASK_QUEUE_RES,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消费者取消消费"); }); }}

Worker2类和1区别不大,将名称改成B再将睡眠事件改成30即可。

 

  

 

  

2.4.3 测试

测试方法:

 

  1.先启动生产者创建队列;

  2.启动两个消费者接收消息;

  3.因为是轮询方式,所以A线程接收之后肯定是B线程接收,在睡眠时关闭B线程,如果A线程接收到说明测试成功。

  发送消息:

  

 

  线程A接收:

  

 

  再发送消息:

  

 

  关闭线程B线程A接收到消息:

  

 

  测试成功!

  

 

  

总结

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注盛行IT的更多内容!

 

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: