RabbitMQ如何保证消息不丢失

RabbitMQ中保证消息不丢失的方法有以下几种:

  • 消息持久化
  • ACK确认机制
  • 设置集群镜像模式
  • 消息补偿模式

消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

要想做到消息持久化,必须满足以下三个条件,缺一不可:

  • Exchange 设置持久化
  • Queue 设置持久化
  • Message持久化发送

PHP示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// http://docs.php.net/manual/da/book.amqp.php
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

// 创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 交换机持久化
$ex->setFlags(AMQP_DURABLE);

// 创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
// 队列持久化
$q->setFlags(AMQP_DURABLE);

// delivery_mode => 2 消息持久化
$ex->publish($message, $k_route, AMQP_NOPARAM, array('delivery_mode' => 2))

ACK确认机制

多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?

这个使用就要使用Message acknowledgment机制,就是消费端消费完成要通知服务端,服务端才把消息从内存删除,这样即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不会丢失。

PHP示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

/**
* 消费回调函数
* 处理消息
*/
$processMessage = function($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
};

// 绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";


$q->consume($processMessage);
// $q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答

$conn->disconnect();

设置集群镜像模式

先介绍下RabbitMQ三种部署模式:

  • 单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
  • 普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
  • 镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:

RabbitMQ

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式:

  • 1)同步至所有的
  • 2)同步最多N个机器
  • 3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式

1
2
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式

1
2
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像

1
2
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是: 系统的吞吐量会有所下降

消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

产线网络环境太复杂,所以不知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。

然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。

RabbitMQ

有用就打赏一下作者吧!