深入浅出RabbitMQ

RabbitMQ是一个由Erlang语言开发的AMQP(Advanced Message Queuing Protocol)的开源实现。AMQP是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ是当前最主流的消息中间件之一,在了解RabbitMQ之前我们需要先熟悉AMQP相关知识。

AMQP角色划分

  • 生产者(producer):产生消息的应用,能够传递消息到消息中间件的应用。
  • 消息中间件(brokers):消息传递的中间载体,例如RabbitMQ。
  • 消费者(consumers):接收并处理消息的应用,从消息中间件获取消息并处理。

消息中间件相关概念

  • 消息队列(Queue):存储还未被消费者消费的消息的容器。
  • 消息(Message): 生产者产生的和消费者处理的消息。
  • 交换机(Exchange):交换机接收生产者发出的消息并且路由到由交换机类型和被称作绑定(bindings)的规则所决定的到队列中,交换机不存储消息。
  • 路由键(Routing key):路由关键字,交换机Exchange的路由规则利用这个关键字进行消息投递到消息队列。
  • 绑定键(Binding Key):Binding Key可以理解为交换机Exchange和消息队列Message Queue的路由规则关系(即消息队列和交换机的绑定)。当交换机Exchange收到生产者传递的消息Message时会解析其Routing Key,Exchange根据Routing Key与交换机类型Exchange Type将Message路由到消息队列中去。
  • 用户(User):最直接了当的认证方式,谁可以使用当前的消息中间件。
  • 虚拟主机(vHosts):虚拟主机概念,一个Virtual Host里面可以有若干个Exchange和Queue,我们可以控制用户在Virtual Host的权限。
  • 信道(channel):通常情况下生产者或消费者需要与消息中间件之间建立多个连接。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源。AMQP协议提供了信道(channel)这个概念来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个信道准备的。

AMQP工作模型

rabbitmq

  1. 消息(message)被生产者(producers) 发送给交换机(exchange)。
  2. 交换机将收到的消息根据路由规则分发给绑定的队列(queue)。
  3. AMQP的实现者消息中间件 (例如rabbitmq)会将消息投递给订阅了此队列的消费者(consumers) ,或者消费者按照需求来主动拉取。

消息、队列、交换机

消息、队列、交换机是消息队列相关概念中相对重要的三个概念,也是我们需要重点梳理的三个地方。

消息

RabbitMQ的消息是有属性概念的。有些属性是被消息中间件所使用的,但是大多数是开放给接收它们的应用解释器用的。我们也可以为消息定义消息头(headers)。消息属性需要在消息被发布的时候定义。例如:

  • Content type(内容类型)
  • Content encoding(内容编码)
  • Routing key(路由键)
  • Delivery mode (persistent or not) 投递模式(持久化 或 非持久化)这里需要注意,消息的持久化依赖与队列的持久化,我们需要同步设置。
  • Message priority(消息优先权)
  • Message publishing timestamp(消息发布的时间戳)
  • Expiration period(消息有效期)
  • Publisher application id(发布应用的ID)

消息队列

队列具有以下属性:

  • Name(名称)
  • Durable(持久化):消息中间件重启后,队列是否依旧存在。持久化队列(Durable queues)会被存储在磁盘上,当消息中间件(broker)重启之后,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。这里需要注意队列的持久化和它存储的未被消费消息的持久化是2个概念,队列的持久化并不会使存储的消息持久化。假如消息中间件(broker)重启之后,持久化队列会被重新声明,但它里面存储的消息只有设置过持久化的消息才能被重新恢复。
  • Exclusive(专用队列):可以这样理解当创建这个队列的Connection关闭后队列即被删除,不存在其它的使用可能性。
  • Auto-delete(自动删除):当没有消费者订阅这个队列的时候就会被删除。
  • Arguments(参数):消息中间件用来完成类似设置最大长度死信队列等的参数。

消息队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。 如果声明中的属性(名称除外)与已存在队列的属性有差异,那么则会申明失败。

交换机

交换机Exchange具有以下属性:

  • Name(名称)
  • Durability (持久化):消息代理重启后,交换机是否还存在。交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息中间件(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在消息中间件再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。
  • Auto-delete (自动删除):当所有与之绑定的消息队列都完成了对此交换机的使用后,是否自动删掉它。
  • Arguments(参数):alternate-exchange等,指定无法路由时的辅助路由。

交换机类型

交换机有多种类型,其中Direct ExchangeFanout ExchangeTopic Exchange为常见类型:

Direct Exchange

处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。

rabbitmq

Fanout Exchange

不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

rabbitmq

Topic Exchange

将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。

rabbitmq

Headers Exchanges

不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

匹配规则x-match有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

消息的可靠传递

交换机、队列和消息的持久化

持久化保证在服务器重启的时候可以保持不丢失相关信息,重点解决服务器的异常崩溃而导致的消息丢失问题。但是,将所有的消息都设置为持久化,会严重影响RabbitMQ的性能,写入硬盘的速度比写入内存的速度慢的不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐率,在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

  • 持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,只有在内存吃紧的时候才会从内存中清楚。
  • 非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。

交换机、队列和消息默认都是持久化模式。

生产者消息确认机制

当消息发送出去之后,我们如何知道消息有没有正确到达exchange呢?如果在这个过程中,消息丢失了,我们根本不知道发生了什么,也不知道是什么原因导致消息发送失败了

为解决这个问题,主要有如下两种方案:

  • 通过事务机制实现
  • 通过生产者消息确认机制(publisher confirm)实现

但是使用事务机制实现会严重降低RabbitMQ的消息吞吐量,我们采用一种轻量级的方案——生产者消息确认机制

什么是消息确认机制

简而言之,就是:生产者发送的消息一旦被投递到所有匹配的队列之后,就会发送一个确认消息给生产者,这就使得生产者知晓消息已经正确到达了目的地。

如果消息和队列是持久化存储的,那么确认消息会在消息写入磁盘之后发出。

再补充一个Mandatory参数:当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。

消费者消息确认机制

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者消息确认机制(message acknowledgement)。采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息。

死信队列

DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。DLX就是一个普通的交换机,和一般的交换机没有任何区别。

当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,rabbitmq会自动发送)。

什么是死信呢?什么样的消息会变成死信呢?

  • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)

应用场景分析:

在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了

如何使用死信交换机呢?

定义业务(普通)队列的时候指定参数:

  • x-dead-letter-exchange: 用来设置死信后发送的交换机
  • x-dead-letter-routing-key:用来设置死信的routingKey
有用就打赏一下作者吧!