自从搭建好博客基本没写过什么,这篇关于RabbitMQ的博客已经拖了很久了,刚好放假总结一下,希望能为16年开个好头。

这是2015年参与一个合作项目时,在他们的项目中用到了RabbitMQ作为消息转发中间件,这也是我后续开发的新功能的基础,下面就将我对RabbitMQ的学习做个简单的总结。

类似的消息队列还有很多,如kalkaZeroMQ等,使用时可先结合自己的使用场景对比,选择最合适的。

相关资料

Rabbitmq官方文档

  • 非常非常好的学习文档,通过其中的六节教程可以完成RabbitMQ的学习,配图清晰明了,本文也借用了其中很多的图片。

pika官方文档

  • 官方推荐的Python库,用于在Python中访问RabbitMQ,具体的操作函数可在其中查看;
  • pika 是 AMQP(Advanced Message Queuing Protocol) 协议的一种实现;
  • 关于Python连接RabbitMQ还有其他库,如py-amqppy-amqplib

其他参考资料

安装RabbitMQ

安装Erlang

  • RabbitMQ基于Erlang开发,因此需要安装Erlang。

Windows版本

  • 下载安装包安装即可。

Linux:

  • Ubuntu下执行命令:sudo apt-get install rabbitmq-server
  • 在RabbitMQ下添加test账户,并赋值其权限为Administrator:
    • rabbitmqctl add_user test test
    • rabbitmqctl set_user_tags test administrator
  • 若有必要,则执行指令,为test用户分配/目录权限,具体可参考下面的图片说明:rabbitmqctl set_permissions -p / test ".*" ".*" ".*"

管理RabbitMQ

命令行

笔者只会一些简单的常用命令查看RabbitMQ的信息,但已经满足了我的开发需求,具体如下:

  • 查看所有的exchange:rabbitmqctl list_exchanges
  • 查看所有的queue:rabbitmqctl list_queues
  • 查看所有的用户:rabbitmqctl list_users
  • 查看所有的绑定(exchange和queue的绑定信息):rabbitmqctl list_bindings
  • 查看消息确认信息:rabbitmqctl list_queues name messages_ready messages_unacknowledged
  • 查看RabbitMQ状态,包括版本号等信息:rabbitmqctl status

Web管理插件

该方式操作简单,功能强大且直观,前提是启动Web插件,然后即可通过浏览器查看。
启动Web管理插件的命令(官方资料): rabbitmq-plugins enable rabbitmq_management
下图是RabbitMQ Web管理界面:

RabbitMQ原理及基本功能介绍

基本通信模型

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

符号说明:

  • P(Producer):发送者,生产消息
  • X(Exchange):消息交换器,用于向Queue发布消息,类似于路由器
  • amq.XX(Queue):消息队列,用于存储消息
  • C(Consumer):消费者,接收并处理消息

RabbitMQ的部分实现机制

  1. Round-robin dispatching(循环分发)
    • 即RabbitMQ会循环将消息分发给每一个消费者;
    • 若启用该机制,则假如有n个消费者,那么RabbitMQ收到n个消息时,会逐个给每个消费者发送一个消息;
  2. Message acknowledgment(消息确认机制-ack)
    • 当打开确认机制(默认打开,可通过参数no_ack=True关闭)时消费者在处理消息的过程中挂掉时,RabbitMQ会将该消息发送给其他消费者;
    • 开启确认模式时,消费者必须在处理完消息是发送确认消息,否则该消息会被重新发送给一个消费者(可以看作是随机发送),如此下去RabbitMQ将吃掉更多的内存;
  3. Message durability(消息持久化)
    • 若开启该功能,则在RabbitMQ Crash的情况下不丢失消息;
    • 但这不能完全保证所有消息不丢失,如在RabbitMQ收到消息到将其存储到disk的期间发生crash,将丢失这期间收到的所有消息;
  4. Fair dispatch(公平的分发)
    • RabbitMQ默认收到消息时直接分发给对应消费者,而不考虑任务大小;
    • 为了能够公平的分发消息,也就是考虑任务大小,可以配置channel.basic_qos(prefetch_count=1),保证接受者处理完消息,并且返回ack消息时才为其派发下一个消息;从而避免某些消费者很忙,某些很闲的情况;
    • :需要保证消息不超过消息队列的大小:如处理每个消息需要1分钟,而每秒钟就会产生一个消息,那么这种情况下很快就会出现消息数量超过queue size的情况;

Exchange和Queue的一些说明

  1. 默认Exchang
    • 定义exchange时,若名字为空,则使用默认exchange;
  2. 临时Queue
    • result=channel.queue_declare()
    • result=channel.queue_declare(exclusive=True)
    • 上面两句代码即可生成一个无名临时队列,当断开消费者时,该队列也会被删除;
  3. 绑定Queue和Exchange
    • 队列绑定到exchange上才可以接收exchange分发的消息;
    • channel.queue_bind(exchange='logs',queue=result.method.queue),这句话即可将前面的无名队列绑定到名为logs的exchange;
    • 绑定成功之后,exchange收到消息时就会广播给和其绑定的所有消费者;

RabbitMQ的四种工作模式

RabbitMQ并不是直接将消息发送到queue,而是将消息发送给exchange,然后exchange再将消息发送到对应的queue;
exchange共有四种模式:fanout, direct, topic and headers

fanout

exchange将消息发送给和该exchange连接的所有queue;也就是所谓的广播模式;此模式下忽略routing_key;

direct

路由模式,通过routing_key将消息发送给对应的queue;
如下面这句即可设置exchange为direct模式,只有routing_key为“black”时才将其发送到队列queue_name;
channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key='black')

在上图中,Q1和Q2可以绑定同一个key,如绑定routing_key=‘KeySame’,那么收到routing_key为KeySame的消息时将会同时发送给Q1和Q2,退化为广播模式;

topic

topic模式类似于direct模式,只是其中的routing_key变成了一个有“.”分隔的字符串,“.”将字符串分割成几个单词,每个单词代表一个条件;

  • * (star) can substitute for exactly one word.
  • # (hash) can substitute for zero or more words.

其中整个字符串是一条路由规则,通过*和#即可实现灵活的路由,上图的解释如下:

  • Q1 is interested in all the orange animals.
  • Q2 wants to hear everything about rabbits, and everything about lazy animals.

注:

  • 一条消息可有同时满足多个规则,并分发给多个Queue,如:quick.orange.rabbit;
  • 通过修改路由规则,topic可以退化为fanout和direct模式;

headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

  • 在绑定Queue与Exchange时指定一组键值对;
  • 当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;
  • 如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

RabbitMQ的使用

一些使用场景

  1. 日志记录

    • 各个程序可以向同一个Rabbitmq的exchange发送消息,exchange根据routing_key分发消息到不同的队列,然后再由不同的程序接受者处理;
    • 其中routing_key可以设置成log等级,这样子可以对log实现分级处理;比如下面的两个场景:
      • 对于error的log可以报警;
      • 只有info级别以上的消息才记录到disk;
    • 通过topic模式能实现更灵活的控制,具体看官方文档即可;
  2. 消息分发服务

    • 在RabbitMQ不是瓶颈的情况下是很好的选择。

RabbitMQ在我们项目中的用法

下图中展示了整个系统的架构以及RabbitMQ在其中的作用和相关配置:

  • L1、L2、L3:分别是三个服务器程序,同一套代码的三个运行实例,下面简称 L服务器;
  • Queue-1、Queue-2、Queue-3:同一套代码的三个运行实例,它们的作用是将RabbitMQ中发过来的Json数据转换成L服务器能够识别的数据,主要作用是数据格式转化和消息转发;
  • Tool1、Tool2:两个不同应用程序,都需要和L服务器进行交互;
  • 图中分为上下两部分,上半部分是Tool1和Tool2向L服务器发送请求的流程,其中用到两个Exchange,一个模式时fanout,另一个是direct;
  • 图中的下半部分是Tool1和Tool2接收来自L服务器的数据,通过一个fanout类型的Exchange广播,具体过滤工作由Tool程序自己完成。

注意事项及解决方法

Queue和Exchange不支持修改参数

RabbitMQ不允许重新定义同名但是参数不同的queue,也就是不支持修改已有queue的属性;笔者曾经因为这个错误浪费了很多时间;

例子
在调用exchange_declare函数时,对于同一个exchange必须保证他们的参数完全相同,否则会出错;
No handlers could be found for logger "pika.adapters.blocking_connection" pika.exceptions.ChannelClosed: (None, None)

pika.exceptions.IncompatibleProtocolError

原因:协议错误,RabbitMQ版本不匹配;
解决方法:升级RabbitMQ,一般是升级到3.X即可;

pika.exceptions.ProbableAuthenticationError

原因:权限验证错误
解决办法:

  • 检查连接的账户是否创建成功;
  • 检查账户权限是否合适;
  • guest账户只能用于本地连接RabbitMQ,远程连接必须创建新账户;

This is because you are trying to authenticate using the username and password guest remotely. Starting with RabbitMQ 3.3 you need to create a new account to use remotely, and guest/guest can only be used locally.