Gitlib Gitlib
首页
  • 分类
  • 标签
  • 归档
  • Golang开发实践万字总结
  • MySQL核心知识汇总
  • Redis实践总结
  • MQ实践万字总结
  • Docker数据持久化总结
  • Docker网络模式深度解读
  • 常用游戏反外挂技术总结
  • 读书笔记
  • 心情杂货
  • 行业杂谈
  • 友情链接
关于我
GitHub (opens new window)

Ravior

以梦为马,莫负韶华
首页
  • 分类
  • 标签
  • 归档
  • Golang开发实践万字总结
  • MySQL核心知识汇总
  • Redis实践总结
  • MQ实践万字总结
  • Docker数据持久化总结
  • Docker网络模式深度解读
  • 常用游戏反外挂技术总结
  • 读书笔记
  • 心情杂货
  • 行业杂谈
  • 友情链接
关于我
GitHub (opens new window)
  • 基础架构

  • MQ

    • RabbitMQ

      • RabbitMQ开发环境搭建
      • RabbitMQ集群实践
      • RabbitMQ如何保证消息不丢失
        • 消息持久化
        • ACK确认机制
        • 设置集群镜像模式
        • 消息补偿机制
      • 基于RabbitMQ消息延时队列方案
    • Kafka

    • MQ万字总结
  • 微服务

  • 分布式

  • 高并发

  • 大数据

  • 容器化

  • 架构设计
  • MQ
  • RabbitMQ
Ravior
2018-02-15
目录

RabbitMQ如何保证消息不丢失

RabbitMQ中保证消息不丢失的方法有以下几种:

  1. 消息持久化
  2. ACK确认机制
  3. 设置集群镜像模式
  4. 消息补偿机制

# 消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

要想做到消息持久化,必须满足以下三个条件,缺一不可:

  • Exchange 设置持久化
  • Queue 设置持久化
  • Message持久化发送

PHP示例代码:

// http://docs.php.net/manual/da/book.amqp.php
$conn = new AMQPConnection($conn_args);   
if (!$conn->connect()) {   
    die("Cannot connect to the broker!\n");   
}   
$channel = new AMQPChannel($conn); 

// 创建交换机    
$ex = new AMQPExchange($channel);   
$ex->setName($e_name); 
$ex->setType(AMQP_EX_TYPE_DIRECT); 
// 交换机持久化
$ex->setFlags(AMQP_DURABLE); 

// 创建队列    
$q = new AMQPQueue($channel); 
$q->setName($q_name);   
// 队列持久化
$q->setFlags(AMQP_DURABLE);  

// delivery_mode => 2 消息持久化
$ex->publish($message, $k_route, AMQP_NOPARAM, array('delivery_mode' => 2))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# ACK确认机制

多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?

这个使用就要使用Message acknowledgment机制,就是消费端消费完成要通知服务端,服务端才把消息从内存删除,这样即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不会丢失。

PHP示例代码:


/**
 * 消费回调函数
 * 处理消息
 */ 
$processMessage = function($envelope, $queue) { 
    $msg = $envelope->getBody(); 
    echo $msg."\n"; //处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
}; 

// 绑定交换机与队列,并指定路由键 
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";


$q->consume($processMessage);   
// $q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答  

$conn->disconnect(); 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 设置集群镜像模式

先介绍下RabbitMQ三种部署模式:

  • 单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
  • 普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
  • 镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:

RabbitMQ

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式:

  • 1)同步至所有的
  • 2)同步最多N个机器
  • 3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式

rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
1
2

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式

rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
1
2

3)为每个以“node.”开头的队列分配指定的节点做镜像

rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
1
2

但是:HA 镜像队列有一个很大的缺点就是: 系统的吞吐量会有所下降

# 消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

产线网络环境太复杂,所以不知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。

然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。

RabbitMQ

#RabbitMQ#MQ
上次更新: 2022/12/01, 11:09:34
RabbitMQ集群实践
基于RabbitMQ消息延时队列方案

← RabbitMQ集群实践 基于RabbitMQ消息延时队列方案→

最近更新
01
常用游戏反外挂技术总结
11-27
02
Golang开发实践万字总结
11-11
03
Redis万字总结
10-30
更多文章>
Theme by Vdoing | Copyright © 2011-2022 Ravior | 粤ICP备17060229号-3 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式