springcloud rabbitmq消息确认,rabbitmq发送消息确认
如何解决写爬虫IP受阻的问题?立即使用。
最近部门号召大家多组织技术分享会,说是为了搞活公司的技术氛围,但我知道这个T M只是为了刷KPI。然而,从另一方面来说,这确实是一件好事。与其开那些无聊的斗嘴会,多做技术交流对个人成长还是很有帮助的。
于是我主动报名分享,咳咳咳~,真的不是为了那个KPI,只是为了跟大家学习!
这次分享springboot rabbitmq如何实现消息确认机制,以及实际开发中踩坑的一点经验。其实整体内容比较简单,有时候事情就是这么神奇,越简单的东西越容易出错。
可以看出,使用RabbitMQ后,我们的业务链接明显更长了。虽然实现了系统间的解耦,但是可能导致消息丢失的场景也增加了。例如:
消息生成器-rabbitmq服务器(消息发送失败)
rabbitmq服务器本身出现故障,导致消息丢失。
消息使用者-rabbitmq服务(无法使用消息)
所以,如果能使用中间件,尽量不要使用。如果为了用而用,只会增加你的烦恼。开启消息确认机制后,虽然在很大程度上保证了消息的准确传递,但由于频繁的确认交互,rabbitmq的整体效率变低,吞吐量下降严重。对于不是很重要的消息,真的不建议你使用消息确认机制。
我们先实现springboot rabbitmq的消息确认机制,然后具体分析一下遇到的问题。
一、准备环境
1、引入 rabbitmq 依赖包
依赖性
groupIdorg.springframework.boot/groupId
在Artifact Spring-Boot-Starter-AMQP/Artifact ID/Dependency
2、修改 application.properties 配置
的配置中,需要开启发送方和消费方的消息确认。
spring . rabbit MQ . host=127 . 0 . 0 . 1 spring . rabbit MQ . port=5672 spring . rabbit MQ . username=guest
spring.rabbitmq.password=guest
#发送者打开确认机制。
spring . rabbit MQ . publisher-confirmations=true #发送方开启退货确认机制
spring . rabbit MQ . publisher-returns=true # # # # # # # # # # # # # # # # # # #
#在用户端设置手动确认
spring . rabbit MQ . listener . simple . acknowledge-mode=manual
#你支持重试吗?
spring . rabbit MQ . listener . simple . retry . enabled=true
3、定义 Exchange 和 Queue
定义交换机confirmTestExchange和队列confirm_test_queue,并将队列绑定到交换机。
@ configuration public class queue config {
@Bean(name=confirmTestQueue )
公共队列confirmTestQueue() {
返回新队列( confirm_test_queue ,true,false,false);
}
@ Bean(name= confirmTestExchange )
public fanout exchange confirmTestExchange(){
返回新的fanout exchange( confirmTestExchange );
}
@Bean公共绑定confirmTestFanoutExchangeAndQueue(
@ Qualifier( confirmTestExchange )fanout exchange confirmTestExchange,
@ Qualifier( confirmTestQueue )Queue confirmTestQueue){
返回binding builder . bind(confirmTestQueue)。to(confirmTestExchange);
}}
二、消息发送确认
Send Message Confirmation:用于确认生产者生产者将消息发送给代理,代理上的交易所将其传递给队列时,消息是否传递成功。
从生产者到rabbitmq代理的消息具有confirmCallback确认模式。
从exchange到队列的邮件传递失败具有returnCallback模式。
我们可以利用这两次回调来确保100%交付。
1、 ConfirmCallback确认模式
rabbit MQ代理收到消息后,将立即触发confirmCallback。
@Slf4j
@Componentpublic类ConfirmCallbackService实现RabbitTemplate。确认回调{
@ Override public void confirm(correlation data correlation data,boolean ack,String cause) {
如果(!确认){
Log.error(消息发送异常!);
}否则{
log.info(发送者爸爸已经收到确认,correlationData={},ack={},cause={} ,correlationData.getId(),ack,cause);
}
}}实现接口确认回拨,重写其确认()方法,方法内有三个参数相关数据,确认,原因。
相关数据:对象内部只有一个编号属性,用来表示当前消息的唯一性确认:消息投递到经纪人的状态,真的表示成功原因:表示投递失败的原因。但消息被经纪人接收到只能表示已经到达(法属)马提尼克岛(马提尼克岛的简写)服务器,并不能保证消息一定会被投递到目标长队里。所以接下来需要用到returnCallback
2、 ReturnCallback 退回模式
如果消息未能投递到目标长队里将触发回调返回回调,一旦向长队投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
@Slf4j
@Componentpublic类ReturnCallbackService实现兔子模板.返回回调{
@ Override public void返回的消息(Message Message,int replyCode,String replyText,String exchange,String routingKey) {
日志。info(返回消息===replyCode={ },replyText={},exchange={},routingKey={} ,回复代码,replyText,exchange,routing key);
}}实现接口返回回调,重写returnedMessage()方法,方法有五个参数消息(消息体)、replyCode(响应code)、replyText(响应内容)、交换(交换机)、路由键(队列)。
下边是具体的消息发送,在兔子模板中设置确认和返回回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个相关数据对象,添加一个编号为10000000000。
@ Autowired私兔模板兔模板;
@ auto wired private ConfirmCallbackService ConfirmCallbackService;
@ auto wired private ReturnCallbackService ReturnCallbackService;
公共void sendMessage(字符串交换、字符串路由密钥、对象消息){
/**
* 确保消息发送失败后可以重新返回到队列中
* 注意:yml需要配置发布者-返回:真
*/
兔子模板。设置强制(真);
/**
* 消费者确认收到消息后,手动确认字符(确认字符)回执回调处理
*/
兔子模板。setconfirmcallback(confirmCallbackService);
/**
* 消息投递到队列失败回调处理
*/
兔子模板。setreturncallback(returnCallbackService);
/**
* 发送消息
*/
兔子模板。convertandsend(交换,路由密钥,消息,
消息- {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode .持久);
返回消息;
},
新的相关数据(uuid。随机uuid().toString()));
}
三、消息接收确认
消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(确认)的过程。使用@RabbitHandler注解标注的方法要增加频道(信道)、消息两个参数。
@Slf4j
@组件
@ rabbit listener(queues= confirm _ test _ queue )公共类接收消息1 {
@ rabbit handler public void process handler(字符串msg,通道Channel,消息Message)抛出IOException {
尝试{
log.info(小富收到消息:{} ,msg);
//TODO具体业务
渠道。基本确认(消息。getmessageproperties().getDeliveryTag(),false);
} catch(异常e) {
如果(消息。getmessageproperties().get rede delivered()){
log.error(消息已重复处理失败,拒绝再次接收.);
渠道。基本项目(消息。getmessageproperties().getDeliveryTag(),false);//拒绝消息
}否则{
log.error(消息即将再次返回队列处理.);
渠道。基本nack(消息。getmessageproperties().getDeliveryTag(),false,true);
}
}
}}消费消息有三种回执方式。我们来分析一下每种方法的意义。
1、basicAck
基本确认:表示确认成功。使用这种回执方式后,消息会被rabbitmq broker删除。
Voidbasick (Long deliveryTag,Boolean Multiple) deliveryTag:表示消息传递序列号,每次消耗或重新传递消息时,Delivery Tag都会增加。在手动消息确认模式下,我们可以执行ack、nack、reject等操作。在具有指定deliveryTag的邮件上。
多个:是否批量确认;如果该值为true,所有小于当前消息deliveryTag的消息将被一次性确认。
举个栗子:假设我先发三条消息,deliveryTag分别是5,6,7,但是都没有确认。当我发送第四条消息时,当deliveryTag为8,multiple设置为true时,5、6、7、8的所有消息都会被确认。
2、basicNack
基本确认:表示失败确认。这种方法一般在消费消息业务异常时使用,消息可以重新交付到队列中。
Void基本NACK(长递送标记、布尔型多重、布尔型请求)递送标记:指明消息递送序列号。
多个:是否批量确认。
Requeue:值为true的消息将被重新排队。
3、basicReject
basicReject: Reject消息,它与basicNack的不同之处在于无法执行批处理操作。其他用法也差不多。
Void基本拒绝(长递送标记,布尔队列)递送标记:表示消息递送序列号。
Requeue:值为true的消息将被重新排队。
四、测试
发送消息,测试消息确认机制是否有效。从执行结果来看,发送方发送消息后成功回调,消费方成功消费消息。
使用数据包捕获工具Wireshark观察rabbitmq amqp协议交互的变化,也有更多的ack过程。
五、踩坑日志
1、不消息确认
这是一个非常不熟练的坑,但却是一个非常容易出错的地方。
开启消息确认机制,消费消息时不要忘记channel.basicAck,否则消息会一直存在,导致重复消费。
2、消息无限投递
我第一次接触消息确认机制的时候,消费端的代码是这样写的。想法很简单:在处理业务逻辑后确认消息,在int a=1/0异常后将消息放回队列。
@ rabbit handler public void process handler(String msg,Channel channel,Message message)抛出IOException {
尝试{
Log.info(消费者2收到:{} ,msg);
int a=1/0;
channel . basic ack(message . getmessageproperties()。getDeliveryTag(),false);
} catch(异常e) {
channel . basic nack(message . getmessageproperties()。getDeliveryTag(),false,true);
}
}但是有一个问题。99.9%的业务代码一旦出现bug,不会自动修复。消息将被无限期地传递到队列,使用者将无限期地执行它,从而导致无限循环。
本地CPU瞬间被占满。你可以想象当服务在生产环境中崩溃时,我有多惊慌。
而rabbitmq管理层只有一条未经证实的消息。
经过测试和分析,发现当消息被重新传递到消息队列时,消息不会返回到队列的末尾,而仍然会在队列的头部。
消费者将立即消费该消息,业务流程将抛出异常,该消息将重新加入团队,等等。消息队列处理被阻止,正常消息无法运行。
当时我们的解决方案是先回复消息,然后消息队列会删除消息。同时,我们再次将消息发送到消息队列,将异常消息放在消息队列的末尾,既保证了消息不会丢失,又保证了业务的正常进行。
channel . basic ack(message . getmessageproperties()。getDeliveryTag(),false);//将消息重新发送到队列的末尾。getreceivedxchange(),
message.getMessageProperties()。getReceivedRoutingKey(),MessageProperties。PERSISTENT_TEXT_PLAIN,
JSON . tojsonbytes(msg));但是这种方法并没有解决根本问题,仍然会时不时的报错。稍后,优化消息重试的次数。达到重试限制后,消息会被人工确认,被队列删除,持久化在MySQL中并推送到闹钟,由调度任务进行人工处理和补偿。
3、重复消费
如何保证MQ的消耗是幂等的?这个需要根据具体业务来确定。可以用MySQL或redis持久化消息,然后验证消息中的唯一性属性。以上是springboot rabbitmq如何用消息确认的细节。更多请关注我们的其他相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。