rabbitmq5种工作模式,rabbitmq6种工作模式

  rabbitmq5种工作模式,rabbitmq6种工作模式

  本文主要介绍python操作RabbitMq的三种工作模式。有需要的朋友可以借鉴一下,希望能有所帮助。祝大家进步很大,早日升职加薪。

  00-1010一.导言:二.兔子的生产和消费。RabbitMq持久性四。RabbitMq发布和订阅模式I:扇出模式II:直接模式III: topicd

  

目录

  RabbitMq是一个实现高级消息队列协议(AMQP)的开源消息代理中间件。消息队列是应用程序间通信的一种方式。通过编写消息,一个应用程序将消息传输到队列,另一个应用程序读取该消息以完成通信。作为一个中间件,RabbitMq无疑是目前最流行的消息队列之一。

  RabbitMq的应用场景非常广泛:

  系统高可用:日常生活中各种商场的秒杀,高流量,高并发。当服务器接收到如此大量的处理业务的请求时,就有停机的风险。有些业务可能极其复杂,但这部分没有时效性,不需要马上反馈给用户。我们可以把这部分处理请求扔给队列,让程序进行后处理,这样就减轻了高并发场景下服务器的压力。往往需要考虑消息队列在分布式系统、集成系统、子系统之间的对接、架构设计中的应用。

  

一、简介:

  生产者:队列消息的生产者,负责产生消息并将其发送到队列。

  进口鼠兔

  导入json

  credentials=pika . plain credentials(香波, 123456) # mqusername和密码

  #虚拟队列需要指定参数virtual_host。如果是默认值,您可以将其留空。

  连接=鼠兔。BlockingConnection(pika。ConnectionParameters(主机=10.1.62.170 ,端口=5672,虚拟主机=/,凭据=凭据))

  channel=connection.channel()

  #声明消息将被传递到的消息队列,如果不存在,则创建它。

  result=channel . queue _ declare(queue= python-test )

  对于范围(10):内的I

  message=JSON . dumps({ OrderId : 1000% s % I })

  #将值routing_key插入队列是队列名。

  channel . basic _ publish(exchange= ,routing_key=python-test ,body=message)

  打印(消息)

  connection.close()

  消费者:队列消息的接收者,负责接收和处理消息队列中的消息。

  进口鼠兔

  凭据=鼠兔。普通凭据(洗发水, 123456 )

  连接=鼠兔。BlockingConnection(pika。ConnectionParameters(主机=10.1.62.170 ,端口=5672,虚拟主机=/,凭据=凭据))

  channel=connection.channel()

  #声明传递消息的消息队列,如果队列不存在,则创建一个队列。

  channel . queue _ declare(queue= python-test ,durable=False)

  #定义一个回调函数来处理消息队列中的消息,这里它被打印出来

  定义回调(通道、方法、属性、主体):

  chbasic _ ack(delivery _ tag=method . delivery _ tag)

  print(body.decode())

  #告诉rabbitmq使用回调来接收消息

  channel . basic _ consume( python-test ,回调)

  #开始接收信息,并进入阻塞状态。只有当队列中有信息时,才会调用回调进行处理。

  channel.start_consuming()

  

二、RabbitMq 生产和消费

  MQ默认建立临时队列和E。

  xchange,如果不声明持久化,一旦 rabbitmq 挂掉,queue、exchange 将会全部丢失。所以我们一般在创建 queue 或者 exchange 的时候会声明 持久化。

  1.queue 声明持久化

  

# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储

  result = channel.queue_declare(queue = python-test,durable = True)

  

  2.exchange 声明持久化

  

# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储

  channel.exchange_declare(exchange = python-test, durable = True)

  

  注意:如果已存在一个非持久化的 queue 或 exchange ,执行上述代码会报错,因为当前状态不能更改 queue 或 exchange 存储属性,需要删除重建。如果 queue 和 exchange 中一个声明了持久化,另一个没有声明持久化,则不允许绑定。

  3.消息持久化

  虽然 exchange 和 queue 都申明了持久化,但如果消息只存在内存里,rabbitmq 重启后,内存里的东西还是会丢失。所以必须声明消息也是持久化,从内存转存到硬盘。

  

# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化

   channel.basic_publish(exchange = ,routing_key = python-test,body = message,

   properties=pika.BasicProperties(delivery_mode = 2))

  

  4.acknowledgement 消息不丢失

  消费者(consumer)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息丢失。但是也可以选择消费者处理失败时,将消息回退给 rabbitmq ,重新再被消费者消费,这个时候需要设置确认标识。

  

channel.basic_consume(callback,queue = python-test,

  # no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉

   no_ack = False)

  

  

  

四、RabbitMq 发布与订阅

  rabbitmq 的发布与订阅要借助交换机(Exchange)的原理实现:

  

  Exchange 一共有三种工作模式:fanout, direct, topicd

  

  

模式一:fanout

  这种模式下,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。

  

  • 不需要指定 routing_key ,即使指定了也是无效。
  • 需要提前将 exchange 和 queue 绑定,一个 exchange 可以绑定多个 queue,一个queue可以绑定多个exchange。
  • 需要先启动订阅者,此模式下的队列是 consumer 随机生成的,发布者仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。

  发布者:

  

import pika

  import json

  credentials = pika.PlainCredentials(shampoo, 123456) # mq用户名和密码

  # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。

  connection = pika.BlockingConnection(pika.ConnectionParameters(host = 10.1.62.170,port = 5672,virtual_host = /,credentials = credentials))

  channel=connection.channel()

  # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储

  channel.exchange_declare(exchange = python-test,durable = True, exchange_type=fanout)

  for i in range(10):

   message=json.dumps({OrderId:"1000%s"%i})

  # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置

   channel.basic_publish(exchange = python-test,routing_key = ,body = message,

   properties=pika.BasicProperties(delivery_mode = 2))

   print(message)

  connection.close()

  

  订阅者:

  

import pika

  credentials = pika.PlainCredentials(shampoo, 123456)

  connection = pika.BlockingConnection(pika.ConnectionParameters(host = 10.1.62.170,port = 5672,virtual_host = /,credentials = credentials))

  channel = connection.channel()

  # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除

  result = channel.queue_declare(,exclusive=True)

  # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储

  channel.exchange_declare(exchange = python-test,durable = True, exchange_type=fanout)

  # 绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去

  channel.queue_bind(exchange = python-test,queue = result.method.queue)

  # 定义一个回调函数来处理消息队列中的消息,这里是打印出来

  def callback(ch, method, properties, body):

   ch.basic_ack(delivery_tag = method.delivery_tag)

   print(body.decode())

  channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉

   auto_ack = False)

  channel.start_consuming()

  

  

  

模式二:direct

  这种工作模式的原理是 消息发送至 exchange,exchange 根据路由键(routing_key)转发到相对应的 queue 上。

  

  • 可以使用默认 exchange =' ' ,也可以自定义 exchange
  • 这种模式下不需要将 exchange 和 任何进行绑定,当然绑定也是可以的。可以将 exchange 和 queue ,routing_key 和 queue 进行绑定
  • 传递或接受消息时 需要指定 routing_key
  • 需要先启动订阅者,此模式下的队列是 consumer 随机生成的,发布者仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。

  发布者:

  

import pika

  import json

  credentials = pika.PlainCredentials(shampoo, 123456) # mq用户名和密码

  # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。

  connection = pika.BlockingConnection(pika.ConnectionParameters(host = 10.1.62.170,port = 5672,virtual_host = /,credentials = credentials))

  channel=connection.channel()

  # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储

  channel.exchange_declare(exchange = python-test,durable = True, exchange_type=direct)

  for i in range(10):

   message=json.dumps({OrderId:"1000%s"%i})

  # 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化

   channel.basic_publish(exchange = python-test,routing_key = OrderId,body = message,

   properties=pika.BasicProperties(delivery_mode = 2))

   print(message)

  connection.close()

  

  消费者:

  

import pika

  credentials = pika.PlainCredentials(shampoo, 123456)

  connection = pika.BlockingConnection(pika.ConnectionParameters(host = 10.1.62.170,port = 5672,virtual_host = /,credentials = credentials))

  channel = connection.channel()

  # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除

  result = channel.queue_declare(,exclusive=True)

  # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储

  channel.exchange_declare(exchange = python-test,durable = True, exchange_type=direct)

  # 绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去

  channel.queue_bind(exchange = python-test,queue = result.method.queue,routing_key=OrderId)

  # 定义一个回调函数来处理消息队列中的消息,这里是打印出来

  def callback(ch, method, properties, body):

   ch.basic_ack(delivery_tag = method.delivery_tag)

   print(body.decode())

  #channel.basic_qos(prefetch_count=1)

  # 告诉rabbitmq,用callback来接受消息

  channel.basic_consume(result.method.queue,callback,

  # 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉

   auto_ack = False)

  channel.start_consuming()

  

  

  

模式三:topicd

  这种模式和第二种模式差不多,exchange 也是通过 路由键 routing_key 来转发消息到指定的 queue 。 不同点是routing_key 使用正则表达式支持模糊匹配,但匹配规则又与常规的正则表达式不同,比如#是匹配全部,*是匹配一个词。

  举例:routing_key =#orderid#,意思是将消息转发至所有 routing_key 包含 orderid 字符的队列中。代码和模式二 类似,就不贴出来了。

  以上就是python操作RabbitMq的三种工作模式的详细内容,更多关于python操作RabbitMq工作模式的资料请关注盛行IT软件开发工作室其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: