RabbitMQ个人实践()

  本篇文章为你整理了RabbitMQ个人实践()的详细内容,包含有 RabbitMQ个人实践,希望能帮助你了解 RabbitMQ个人实践。

  MQ(Message Queue)就是消息队列,其有点有很多:解耦、异步、削峰等等,本文来聊一下RabbitMQ的一些概念以及使用。

  RabbitMq

  Springboot整合RabbitMQ简单案例

  Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

  Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

  Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

  Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

  Producer:消息生产者,就是投递消息的程序。

  Consumer:消息消费者,就是接受消息的程序。

  发布消息到RabbitMQ需要经过两步:

  producer → exchange

  exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

  了解了RabbitMQ的一些概念,我们来捋捋使用RabbitMQ的流程:

  创建Exchange

  创建Queue

  将Queue绑定进Exchange中(此处会设置routing key)

  生产者发布消息

  消费者订阅消息

  交换机(Exchange)

  交换机可以绑定队列,绑定时可以给队列指定路由(Routing key)和参数(Arguments)

  所有的消息发送都是经过交换机转发到队列的,而不是直接到队列中

  交换机类型:

  
direct

  根据确定的路由(routing key)转发消息到队列中(一条消息可以发到多个队列,只要路由相同)

  
#表示任意数量(零个或多个)单词

  例如:如果队列的路由为com.# 那么往交换机发消息是,路由填com.ccc 队列就可以收到消息

  
消息过期时间TTL

  有两种方式设置TTL,创建队列时设置整个队列的TTL或者在发送消息时单独设置每条消息的TTL,消息存活时间取两者的最小值。

  
Map String, Object args = new HashMap ();

   args.put("x-message-ttl", 5000); // 设置队列中的消息5秒过期

   return new Queue("queueName",true, false, false, args);

  

 

 

  
发送消息时设置

  

public void makeOrder(String userid,String productid,int num){

 

   String exchangeName = "ttl_exchange";

   String routingKey = "ttlmessage";

   //给消息设置过期时间

   MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){

   public Message postProcessMessage(Message message){

   // 设置消息5秒过期

   message.getMessageProperties().setExpiration("5000");

   return message;

   rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor);

  

 

  
死信队列也是一个正常队列,只是当绑定了死信队列的队列满足相应条件,就会将满足条件的消息转移到死信队列中。

  进入死信队列的条件:

  消息被拒绝

  消息过期(超时)

  队列达到最大长度

  死信队列的配置:

  
给需要绑定死信队列的队列添加x-dead-letter-exchange(死信队列的交换机)和x-dead-letter-routing-key(死信队列的路由)参数

  

@Bean

 

  public Queue queue(){

   Map String, Object args = new HashMap ();

   args.put("x-dead-letter-exchange", "死信队列交换机名称");

   args.put("x-dead-letter-routing-key", "死信队列路由");

   return new Queue("queueName",true, false, false, args);

  

 

  
producer → exchange
 

  exchange 根据 exchange 的类型和 routing key 确定将消息投递到哪个队列

  所以,发布消息的确认也分两个部分,以下是确认步骤:

  
* PostConstruct注解好多人以为是Spring提供的。其实是Java自己的注解。

   * Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。

   * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。

   * PostConstruct在构造函数之后执行,init()方法之前执行。

   * Constructor(构造方法) - @Autowired(依赖注入) - @PostConstruct(注释的方法)

  @PostConstruct

  private void regCallBack() {

   // producer - exchange 成功或失败都会触发此回调

   rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

   @Override

   public void confirm(CorrelationData correlationData, boolean ack, String cause) {

   // 这个id是在消息发送的时候传入的

   String id = correlationData.getId();

   // 如果ack为true代表消息被mq成功接收

   if (!ack) {

   // 应答失败,修改日志状态

   System.out.println("exchange 应答失败,做失败处理!");

   } else {

   // 应答成功,修改日志状态

   System.out.println("exchange 成功处理");

   // 这个回调只有exchange - queue 失败时才会触发

   rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

   @Override

   public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

   System.out.println("exchange - queue 发送失败");

  

 

 

  


String correlationId = "这是日志id";

 

  rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() {

   @Override

   public Message postProcessMessage(Message message) throws AmqpException {

   // 消费者需要correlationId才做这个处理

   message.getMessageProperties().setCorrelationId(correlationId);

   return message;

  }, new CorrelationData(correlationId));

  // 如果消费者不需要获取correlationId,则用下面这种即可

  rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));

  

 

  


@RabbitListener(queues = {"队列名字"})

 

  public void messageConsumer(String orderMsg, Channel channel, @Headers Map String,Object headers) throws Exception{

   // 需要producer做相应处理,consumer才能拿到correlationId

   String correlationId = messages.getMessageProperties().getCorrelationId();

   System.out.println("消息为:" + orderMsg);

   long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString());

   try {

   // 消费成功,进行确认

   channel.basicAck(tag, false);

   } catch (IOException e) {

   // 消费失败,重发

   // requeue代表是否重发,为false则直接将消息丢弃,有死信就进入死信队列

   channel.basicNack(tag, false, true);

  

 

  
本文介绍了RabbitMQ的一些概念和简单使用,有不少东西其实是没有讲清楚的,比如publisher-confirm-type和acknowledge-mode的几种类型的区别等等。主要是在官方文档找不到相关的细致描述,查文档的能力还是有待提高。。。

  参考资料

  RabbitMq 技术文档 - 腾讯云开发者社区-腾讯云 (tencent.com)

  Spring AMQP

  以上就是RabbitMQ个人实践()的详细内容,想要了解更多 RabbitMQ个人实践的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: