本篇文章为你整理了RabbitMQ(rabbitmq怎么保证消息不重复消费)的详细内容,包含有rabbitmq如何保证消息不丢 rabbitmq怎么保证消息不重复消费 rabbitmq消息积压如何解决 rabbitmq和kafka的区别 RabbitMQ,希望能帮助你了解 RabbitMQ。
流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
3.MQ实现
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
几种常见MQ的对比:
在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
1.2.安装MQ
执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
注意5672是MQ消息通信的端口,15672是ui端口
三、RabbitMQ
大致分为两种
1.**无交换机的 **基本消息队列 和 工作消息队列
2.有交换机的发布订阅,路由,主题
2.基于SpringAMQP使用
2.1基础消息队列
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("116.62.32.68");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
System.out.println("等待接收消息。。。。");
groupId org.springframework.boot /groupId
artifactId spring-boot-starter-amqp /artifactId
/dependency
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
2.2工作消息队列
消费者:
//工作队列
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1Message(String msg) throws InterruptedException {
System.out.println("spring 消费者1接收到消息:【" + msg + "】");
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue2Message(String msg) throws InterruptedException {
System.out.println("spring 消费者2接收到消息:【" + msg + "】");
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
listener:
direct:
prefetch: 1 # 消息预期,之前把所有消息预期导致平分消息,现在是处理一个取一个
2.3广播Fanout
多了一个交换机,交换机决定将生产者的消息发送给哪个队列,这决定就是交换机种类不同规则不同,因此分为三种,广播、路由和主题。
广播:只要队列绑定了交换机,就发送消息到队列
1.编写消费者:使用注解声明队列和交换机(同时绑定消费者的队列和交换机)
//3.广播(绑定队列,同时绑定交换机)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue1"),
exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)
public void listenFanoutQueue1(String message){
System.out.println("广播方式队列1接受消息:"+message);
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.queue2"),
exchange = @Exchange(name = "fanout.exange",type = ExchangeTypes.FANOUT)
public void listenFanoutQueue2(String message){
System.out.println("广播方式队列2接受消息:"+message);
2.生产者
//广播类型交换机
@Test
public void testFanoutExchangeQueue(){
//交换机名称
String exchangeName = "fanout.exange";
//消息
String message = "广播消息";
//发送,第二个参数是队列的key,在路由类型中可以使用到
rabbitTemplate.convertAndSend(exchangeName,"",message);
2.4路由Route
和广播相比,在队列中多了一个key属性。
交换机通过消息的key来对应路由到相同key的队列上。
1.消费者:
/**
4.路由
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
public void listenDirectQueue1(String message){
System.out.println("路由红、蓝队列接受消息:"+message);
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "direct.exchange",type = ExchangeTypes.DIRECT),
key = {"red","green"}
public void listenDirectQueue2(String message){
System.out.println("路由红、绿队列接受消息:"+message);
2.生产者
//路由类型交换机
@Test
public void testDirectExchangeQueue(){
//交换机名称
String exchangeName = "direct.exchange";
//消息
String message = "路由消息";
//发送,第二个参数是队列的key,在路由类型中可以使用到
rabbitTemplate.convertAndSend(exchangeName,"red",message);
2.5主题Topic
key不再是一组单词,而是一个规则
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.spu.insert 或者 item.spu
item.*:只能匹配item.spu
1.消费者
/**
* 主题
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
key = "china.#"
public void listenTopicQueue1(String message){
System.out.println("中国队列接受消息:"+message);
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "topic.exchange",type = ExchangeTypes.TOPIC),
key = "#.news"
public void listenTopicQueue2(String message){
System.out.println("新闻队列接受消息:"+message);
2.生产者
//主题类型交换机
@Test
public void testTopicExchangeQueue(){
//交换机名称
String exchangeName = "topic.exchange";
//消息
String message = "china.lala消息";
//发送,第二个参数是队列的key,在路由类型中可以使用到
rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
3.消息转换器
Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
因此需要配置一个Json转换器
在publisher和consumer两个服务中都引入依赖:
dependency
groupId com.fasterxml.jackson.dataformat /groupId
artifactId jackson-dataformat-xml /artifactId
version 2.9.10 /version
/dependency
配置消息转换器。在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
以上就是RabbitMQ(rabbitmq怎么保证消息不重复消费)的详细内容,想要了解更多 RabbitMQ的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。