RabbitMQ如何保证消息不丢失
RabbitMQ中保证消息不丢失的方法有以下几种:
- 消息持久化
- ACK确认机制
- 设置集群镜像模式
- 消息补偿机制
# 消息持久化
RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。
要想做到消息持久化,必须满足以下三个条件,缺一不可:
- Exchange 设置持久化
- Queue 设置持久化
- Message持久化发送
PHP示例代码:
// http://docs.php.net/manual/da/book.amqp.php
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
// 创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT);
// 交换机持久化
$ex->setFlags(AMQP_DURABLE);
// 创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
// 队列持久化
$q->setFlags(AMQP_DURABLE);
// delivery_mode => 2 消息持久化
$ex->publish($message, $k_route, AMQP_NOPARAM, array('delivery_mode' => 2))
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# ACK确认机制
多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?
这个使用就要使用Message acknowledgment机制,就是消费端消费完成要通知服务端,服务端才把消息从内存删除,这样即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不会丢失。
PHP示例代码:
/**
* 消费回调函数
* 处理消息
*/
$processMessage = function($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
};
// 绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";
$q->consume($processMessage);
// $q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
$conn->disconnect();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 设置集群镜像模式
先介绍下RabbitMQ三种部署模式:
- 单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
- 普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
- 镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案
为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:
如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。
下面介绍下三种HA策略模式:
- 1)同步至所有的
- 2)同步最多N个机器
- 3)只同步至符合指定名称的nodes
命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
2
2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
2
3)为每个以“node.”开头的队列分配指定的节点做镜像
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
2
但是:HA 镜像队列有一个很大的缺点就是: 系统的吞吐量会有所下降
# 消息补偿机制
为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,
比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?
产线网络环境太复杂,所以不知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。
然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。