MQ万字总结
# 1. MQ
# 1.1 优点
- 系统解耦:采用消息队列的方式来处理部分系统调用。上游服务将数据发送到MQ服务,下游服务根据自身的业务需求和消费能力对MQ消息进行消费。
- 异步处理:系统间采用同步调用方式会受到系统吞吐量、并发量、响应时间等各种瓶颈限制。此时可以采用消息队列的方式将非必需或者实时性要求不高的请求异步化。在业务异步处理过程中还可以进行消息分裂,充分利用服务资源提高业务处理效率。
- 削峰填谷:在使用消息队列的场景下,当遇到流量洪峰时消息队列可以将大量消息蓄积在消息队列中,在洪峰结束之后消费者可以继续消费存量消息。如此可以大大提高系统的吞吐量,也有助于提高系统的稳定性和用户体验。在流量会出现剧烈波动场景特别适合使用,特别是电商促销场景。
- 数据分发:在分布式场景下,一条事件消息可能会被多个下游系统关注,大部分消息中间件均支持一对多消费或者消息广播的模式。消息中间件可以根据用户定义的规则进行消息路由分发,下游系统只需要关注自己感兴趣的消息即可。这一特性可以加以利用提高系统的扩展性。
# 1.2 缺点
- 系统可用性降低:系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,ABCD 四个系统还好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整?MQ 一挂,整套系统崩溃,你不就完了?保证消息队列的高可用。
- 系统复杂度提高:硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?
- 一致性问题:A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,数据就不一致了。
# 1.3 MQ中间件对比
综上,各种对比之后,有如下建议:
- 中小型公司,技术挑战不是特别高,用
RabbitMQ
是不错的选择; - 大型公司,基础架构研发实力较强,用
RocketMQ
是很好的选择; - 如果是大数据领域的实时计算、日志采集等场景,用
Kafka
是业内标准的,绝对没问题,社区活跃度很高,几乎是全世界这个领域的事实性规范;
# 2. RabbitMQ
# 2.1 基本原理
RabbitMQ是一个由Erlang语言开发的AMQP
(Advanced Message Queuing Protocol, 高级消息队列协议)的开源实现。
# 2.2 相关概念
消息队列
(Queue):存储还未被消费者消费的消息的容器。消息
(Message): 生产者产生的和消费者处理的消息。交换机
(Exchange):交换机接收生产者发出的消息并且路由到由交换机类型和被称作绑定(bindings)的规则所决定的到队列中,交换机不存储消息。路由键
(Routing key):路由关键字,交换机Exchange的路由规则利用这个关键字进行消息投递到消息队列。绑定键
(Binding Key):Binding Key可以理解为交换机Exchange和消息队列Message Queue的路由规则关系(即消息队列和交换机的绑定)。当交换机Exchange收到生产者传递的消息Message时会解析其Routing Key,Exchange根据Routing Key与交换机类型Exchange Type将Message路由到消息队列中去。用户
(User):最直接了当的认证方式,谁可以使用当前的消息中间件。虚拟主机
(vHosts):虚拟主机概念,一个Virtual Host里面可以有若干个Exchange和Queue,我们可以控制用户在Virtual Host的权限, RabbitMQ默认的vhost是 /。信道
(channel):通常情况下生产者或消费者需要与消息中间件之间建立多个连接。无论怎样,同时开启多个TCP连接都是不合适的,因为这样做会消耗掉过多的系统资源。AMQP协议提供了信道(channel)这个概念来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个信道准备的。
# 2.2.1 Queue消息队列
消息队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。 如果声明中的属性(名称除外)与已存在队列的属性有差异,那么则会申明失败。
队列具有以下属性:
Name
(名称)Durable
(持久化):消息中间件重启后,队列是否依旧存在。持久化队列(Durable queues)会被存储在磁盘上,当消息中间件(broker)重启之后,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。这里需要注意队列的持久化和它存储的未被消费消息的持久化是2个概念,队列的持久化并不会使存储的消息持久化。假如消息中间件(broker)重启之后,持久化队列会被重新声明,但它里面存储的消息只有设置过持久化的消息才能被重新恢复。Exclusive
(专用队列):可以这样理解当创建这个队列的Connection关闭后队列即被删除,不存在其它的使用可能性。Auto-delete
(自动删除):当没有消费者订阅这个队列的时候就会被删除。Arguments
(参数):消息中间件用来完成类似设置最大长度死信队列等的参数。
# 2.2.2 Message消息
消息属性需要在消息被发布的时候定义。例如:
Content type
(内容类型)Content encoding
(内容编码)Routing key
(路由键)Delivery mode
(persistent or not) 投递模式(持久化 或 非持久化)这里需要注意,消息的持久化依赖于队列的持久化,我们需要同步设置。Message priority
(消息优先权)Message publishing timestamp
(消息发布的时间戳)Expiration period
(消息有效期)Publisher application id
(发布应用的ID)
# 2.2.3 Exchange交换机
交换机Exchange具有以下属性:
Name
(名称)Durability
(持久化):消息代理重启后,交换机是否还存在。交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息中间件(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在消息中间件再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。Auto-delete
(自动删除):当所有与之绑定的消息队列都完成了对此交换机的使用后,是否自动删掉它。Arguments
(参数):alternate-exchange等,指定无法路由时的辅助路由。
# 2.2.3.1 交换机类型
Direct
(直接): 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。FanOut
(分发):你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。Topic
: 将路由键和某模式进行匹配。此时队列需要绑定在一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”
# 2.2.4 TTL
TTL(Time To Live):生存时间。RabbitMQ支持消息的过期时间,一共两种。
- 在消息发送时可以进行指定。通过配置消息体的properties,可以指定当前消息的过期时间。
- 在创建Exchange时可进行指定。从进入消息队列开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。
# 2.2.5 消费端ACK与NACK
消费端进行消费的时候,如果由于业务异常可以进行日志的记录,然后进行补偿。由于服务器宕机等严重问题,我们需要手动进行ACK保障消费端消费成功。 消费端重回队列(retry)是为了对没有成功处理消息,把消息重新返回到Broker。一般来说,实际应用中都会关闭重回队列,也就是设置为false。
type Acknowledger interface {
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple bool, requeue bool) error
Reject(tag uint64, requeue bool) error
}
2
3
4
5
# 2.2.6 生产者Confirm机制
- 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
- 生产者进行接受应答,用来确认这条消息是否正常的发送到了Broker,这种方式也是消息的可靠性投递的核心保障!
# 2.2.7 Return消息机制
Return Listener用于处理一些不可路由的消息。
我们的消息生产者,通过指定一个Exchange和Routing,把消息送达到某一个队列中去,然后我们的消费者监听队列进行消息的消费处理操作。但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达消息,就需要使用到Returrn Listener。
基础API中有个关键的配置项Mandatory
:如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。
通过chennel.addReturnListener(ReturnListener rl)传入已经重写过handleReturn方法的ReturnListener。
# 2.2.8 消费端如何限流
当海量消息瞬间推送过来,单个客户端无法同时处理那么多数据,严重会导致系统宕机。这时,需要削峰。 RabbitMQ提供了一种qos(服务质量保证)功能。即在非自动确认消息的前提下(非ACK),如果一定数目的消息(通过基于consume或者channel设置qos的值)未被确认前,不进行消费新的消息。
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
# 2.3 消息
# 2.3.1 交换机、队列和消息的持久化
持久化保证在服务器重启的时候可以保持不丢失相关信息,重点解决服务器的异常崩溃而导致的消息丢失问题。但是,将所有的消息都设置为持久化,会严重影响RabbitMQ的性能,写入硬盘的速度比写入内存的速度慢的不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐率,在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。
- 持久化的消息在到达队列时就被写入到磁盘,并且如果可以,持久化的消息也会在内存中保存一份备份,这样可以提高一定的性能,只有在内存吃紧的时候才会从内存中清楚。
- 非持久化的消息一般只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。
交换机、队列和消息默认都是持久化模式。
# 2.3.2 消息的可靠性传输
# 2.3.2.1 生产者消息确认机制
生产者消息确认机制是防止生产者往RabbitMQ写入数据时出现数据丢失。可以通过事务或者Confirm机制。
# 2.3.2.1.1 事务
可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect() ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback() ,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit() 。 缺点:RabbitMQ 事务机制(同步),基本上吞吐量会下来,因为太耗性能
# 2.3.2.1.2 Confirm机制
一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
生产者消息确认机制和事物的差异 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。 所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。
# 2.3.2.2 持久化
持久化主要是为了避免RabbitMQ 弄丢了数据,必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。 所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,你也是可以自己重发的。
# 2.3.2.3 消费者消息确认机制
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者消息确认机制(message acknowledgement)。采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息。 RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。 这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
为了保证消息从队列中可靠地到达消费者,RabbitMQ 提供了消息确认机制。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false,RabbitMQ 会等待消费者显式发回 ack 信号后,才从内存(和磁盘,如果是持久化消息)中移去消息。否则,一旦消息被消费者消费,RabbitMQ 会在队列中立即删除它。
# 2.3.3 消息的不可重复消费
消息重复大体上有两种情况会出现:
- 消息消费成功,事务已提交,签收时结果服务器宕机或网络原因导致签收失败,消息状态会由unack转变为ready,重新发送给其他消费方;
- 消息消费失败,由于retry重试机制,重新入队又将消息发送出去。
解决方法有三种:
- 消费方业务接口做好幂等;
- 消息日志表保存MQ发送时的唯一消息ID,消费方可以根据这个唯一ID进行判断避免消息重复;
- 消费方的Message对象有个getRedelivered()方法返回Boolean,为TRUE就表示重复发送过来的。 这里只推荐第一种,业务方法幂等这是最直接有效的方式,2还要和数据库产生交互,3有可能导致第一次消费失败但第二次消费成功的情况被砍掉。
# 2.3.4 消息的顺序性
RabbitMQ 多消费实例情况下要想保证消息的顺序性,非常困难,细节非常多, 对于需要保证顺序消费的业务,可以只部署一个消费者实例.
# 2.4 死信队列
当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中(指定好相关参数,rabbitmq会自动发送)。 什么是死信呢?什么样的消息会变成死信呢?
- 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
- 消息TTL过期
- 队列达到最大长度(队列满了,无法再添加数据到mq中)
# 2.4.1 应用场景
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了
如何使用死信交换机呢?定义业务(普通)队列的时候指定参数:
x-dead-letter-exchange
: 用来设置死信后发送的交换机x-dead-letter-routing-key
:用来设置死信的routingKey
# 2.5 高可用
RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的。 RabbitMQ集群中节点包括内存节点、磁盘节点。内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘上。如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。一个集群至少要有一个磁盘节点。
# 2.5.1 单机
单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式。
# 2.5.2 普通集群
普通集群模式(无高可用),意思就是在多台机器上启动多个 RabbitMQ 实例,每台机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。
这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。 而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。 普通集群模式,并不保证队列的高可用性。尽管交换机、绑定这些可以复制到集群里的任何一个节点,但是队列内容不会复制。虽然该模式解决一项目组节点压力,但队列节点宕机直接导致该队列无法应用,只能等待重启。所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,也就是必须要创建镜像队列。
# 2.5.3 镜像集群
在镜像集群模式下,你创建的 queue,无论是元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
镜像模式配置非常简单,首先镜像模式是基于普通集群模式的,所以前面搭建的集群就是普通模式集群了。在搭建好的集群模式下,选择一个节点,通过命令或者MQ管理后台设置策略为镜像模式即可。 我们在前面的搭建的集群环境基础上,选择节点1进行设置:
docker exec -it myrabbit1 bash
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
exit
2
3
也可以通过MQ管理后台设置:
# 2.5.3.1 缺点
- 性能开销大,消息需要同步到所有机器上,导致网络带宽压力和消耗很重
- 不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?
# 3. Kafka
Kafka采用分布式架构,天然支持高可用。
# 3.1 相关概念
- topic:主题,同一类消息抽象的一个概念
- producer:发布者,负责发布消息到kafka集群/服务器
- consumer:订阅者,负责订阅消费kafka集群/服务器中的消息
- consumer group:订阅者组,同一个topic的消息可以被多个consumer订阅消费,这一类consumer就是consumer group
- broker:代理者,单个kafka服务器节点称为broker
- partition:分区,一个topic的所有消息数据,被分开存储在不同的地方,这个存储单位称为partition
3.2 Topic Kakfa为每个主题(topic)维护了一个分区(partition)的日志结构。每个主题的数据包含一个或多个分区(partiton),每个分区以一个提交日志文件(夹)的形式存在。 每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。每条发给主题的消息都被分配一个在本分区内唯一的序号,这个序号被称为偏移量(offset)。 Kafka集群会保留所有被发布的消息一段时间,无论这些消息是否已经被消费,超过日志保留期(log retention)的消息就会被丢弃以释放空间。 同时,各分区的数据会按照的配置的量被复制到集群(cluster)上的其他服务器上。每个分区都有一个“领导(leader)”服务器和0到多个”追随者(follower)”服务器。领导服务器负责对这个分区的所有读写操作,而追随者服务器则被动的复制领导服务器。如果领导服务器宕机,其中一台追随者服务器会自动被选举为新领导。
# 3.3 Partition
Kafka一个topic下面的所有消息都是以partition的方式分布式的存储在多个节点上。同时在kafka的机器上,每个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段(segment)。Segment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。
每个partition都是⼀个有序并且不可变的消息记录集合。当新的数据写⼊时,就被追加到partition的末 尾。在每个partition中,每条消息都会被分配⼀个顺序的唯⼀标识,这个标识被称为offset,即偏移 量。注意,Kafka只保证在同⼀个partition内部消息是有序的,在不同partition之间,并不能保证消息 有序。
# 3.4 多副本机制
Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。 生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。 Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?
- Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
- Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。
- 当主分区(leader)故障的时候会选择一个备胎(follower)上位,成为Leader.在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
# 3.5 消息存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢? 1、 基于时间,默认配置是168小时(7天)。 2、 基于大小,默认配置是1073741824。
# 3.6 Producer消息生产者
# 3.6.1 消息发布流程
producer就是生产者,是数据的入口。Producer在写入数据的时候会把数据 写入到leader中,不会直接将数据写入follower, 流程如下:
- ⽣产者从Kafka集群获取分区leader信息
- ⽣产者将消息发送给leader
- leader将消息写入本地磁盘
- follower从leader拉取消息数据
- follower将消息写入本地磁盘后向leader发送ACK
- leader收到所有的follower的ACK之后向生产者发送ACK
# 3.6.2 partion选择
消息生产者(producer)向主题(topic)发布消息,生产者自身要负责决定把消息发布到主题的具体哪个分区(partition)。分区的选择可以使简单的轮盘式,或者基于某种语义分区功能(如根据消息中的某键值来运算目标分区编号),大概如下:
- partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
- 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
- 如果既没指定partition,又没有设置key,则会采用轮询⽅式,即每次取一小段时间的数据写入某 个partition,下一小段的时间写入下一个partition
# 3.6.3 ACK机制
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
- 1(默认):这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
- 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性却是最低的。
- all:producer需要等待所有follower都确认接收到数据后才算一次发送完成,可靠性最高。
# 3.6.4 Golang客户端的使用
我们可以使用 Sarama 库的 AsyncProducer 或 SyncProducer 生产消息。在大多数情况下首选使用 AsyncProducer 生产消息。它通过一个 channel 接收消息,并在后台尽可能高效的异步生产消息。 SyncProducer 发送 Kafka 消息后阻塞,直到接收到 ACK 确认。SyncProducer 有两个警告:它通常效率较低,并且实际的耐用性保证取决于 Producer.RequiredAcks 的配置值。在某些配置中,有时仍会丢失由 SyncProducer 确认的消息,但是使用比较简单。
config := sarama.NewConfig()
//等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
//随机向partition发送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoResponse这里才有用.
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
2
3
4
5
6
7
8
# 3.7 Consumer消息消费者
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id。同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据。
在一个特定的消费组内,每个分区(partition)被且仅被分配到一个消费者实例上,也就是说每个分区对应一个消费者,那这个消费者可以严格按照顺序收到消息。该模式决定了每个消费组中的有效消费者数量一定小于或者等于主题包含的分区数量。 如此一来,Kafka仅在同一分区内保证了消息的顺序性,而不保证跨分区的消息顺序性。如果确实需要严格的总体顺序性,则主题只能包含一个分区。
# 3.8 消息的顺序性
Kafka 只能为我们保证 Partition(分区) 中的消息有序。 消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。 所以,我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。 Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。 总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
- 1 个 Topic 只对应一个 Partition。
- (推荐)发送消息的时候指定 key/Partition。
# 3.9 高可用
Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。 这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。
写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为) 消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。
# 3.10 消息的可靠性传输
# 3.10.1 Kafka丢失数据
这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据? 所以此时一般是要求起码设置如下 4 个参数:
- 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
- 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
- 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。
- 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。 我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。
# 3.10.2 生产者会不会弄丢数据?
如果按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
# 3.10.3 消费端弄丢了数据
唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
// AutoCommit specifies configuration for commit messages automatically.
AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool
// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration
}
2
3
4
5
6
7
8
9
10
# 3.11 消息的不可重复消费
kafka出现消息重复消费的原因:
- 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
- Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。
解决方案:
- 消费消息服务做幂等校验,比如 Redis 的set、MySQL 的主键等天然的幂等功能。这种方法最有效。
- 将 enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交offset合适?
- 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
- 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
# 3.12 Kafka如何实现高性能
- Kafka就是基于页缓存技术 + 磁盘顺序写 技术实现了写入数据的超高性能。
- 零拷贝技术(zero-copy)
# 4. NSQ
NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。 NSQ的优势有以下优势:
- NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证
- NSQ支持横向扩展,没有任何集中式代理。
- NSQ易于配置和部署,并且内置了管理界面。
NSQ特征:
- 消息默认不可持久化,虽然系统支持消息持久化存储在磁盘中(通过设置 –mem-queue-size 为零),不过默认情况下消息都在内存中
- 消息最少会被投递一次,假设成立于 nsqd 节点没有错误
- 消息无序,是由重新队列(requeue),内存和磁盘存储的混合导致的,实际上,节点间不会共享任何信息。它是相对的简单完成疏松队列
- 支持无 SPOF 的分布式拓扑,nsqd 和 nsqadmin 有一个节点故障不会影响到整个系统的正常运行
- 支持requeue,延迟消费机制 暂时无法在飞书文档外展示此内容
# 4.1 主要组件
- nsqd:nsqd 是一个守护进程,负责接收(生产者 producer )、排队(最小堆实现)、投递(消费者 consumer )消息给客户端。它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的。
- nsqlookupd:nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题( topic )的生产者,并且 nsqd 节点广播话题(topic)和通道( channel )信息。有两个接口:TCP 接口, nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。
- nsqadmin:nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。
# 4.2 nsqd
nsqd负责提供集群的主要功能,一个nsqd实例同时处理多个数据流。与Kafka类似,一个数据流也可以被称为一个topic。NSQ还有一个Channel的概念,类似于Kafka的ConsumerGroup,一个topic可以对应多个Channel,不同的是,每个Channel都存储一份完整的数据(完整的未消费的数据),而Kafka则是存储同一份数据,通过Offset来区分不同的ConsumerGroup的消费进度。
- Consumer需要指定一个Channel才能消费数据。
- 一个Channel通常会与多个Consumer Client建立连接。当channel中存在待处理的消息时,Channel会从多个Client中随机选择一个进行推送,整个流程如下图所示:
概括来说,一条消息从Topic广播到Channel(每个Channel都会收到一份消息),然后再从Channel分发到Consumer。
# 4.3 Producer消息生产者
- producer 通过 HTTP API 将消息发布到 nsqd 的指定 topic ,一般有 pub/mpub 两种方式, pub 发布一个消息, mpub 一个往返发布多个消息。
- producer 也可以通过 nsqd客户端 的 TCP接口 将消息发布给 nsqd 的指定 topic 。
- 当生产者 producer 初次发布带 topic 的消息给 nsqd 时,如果 topic 不存在,则会在 nsqd 中创建 topic。
# 4.4 Channel消息传递的通道
- 当生产者每次发布消息的时候,消息会采用多播的方式被拷贝到各个 channel 中, channel 起到队列的作用。
- channel 与 consumer(消费者) 相关,是消费者之间的负载均衡,消费者通过这个特殊的channel读取消息。
- 一个 channel 一般会有多个 consumer 连接。假设所有已连接的 consumer 处于准备接收消息的状态,每个消息将被传递到一个随机的 consumer。
# 4.5 Consumer消息消费者
- consumer 通过 TCP subscribe 自己需要的 channel
- topic 和 channel 都没有预先配置。 topic 由第一次发布消息到命名 topic 的 producer 创建 或 第一次通过 subscribe 订阅一个命名 topic 的 consumer 来创建。 channel 被 consumer 第一次 subscribe 订阅到指定的 channel 创建。
- 多个 consumer subscribe一个 channel,假设所有已连接的客户端处于准备接收消息的状态,每个消息将被传递到一个 随机 的 consumer。
- NSQ 支持延时消息, consumer 在配置的延时时间后才能接受相关消息。
- Channel在 consumer 退出后并不会删除,这点需要特别注意。
# 4.6 链接方式
- 直链nsqd 使用的客户端库是官方库 go-nsq,使用直接连nsqd的方式:
adds := []string{"127.0.0.1:7000", "127.0.0.1:8000"}
config := nsq.NewConfig()
topicName := "testTopic1"
c, _ := nsq.NewConsumer(topicName, "ch1", config)
testHandler := &MyTestHandler{consumer: c}
c.AddHandler(testHandler)
if err := c.ConnectToNSQDs(adds); err != nil {
panic(err)
}
2
3
4
5
6
7
8
9
10
11
如果有nsqd出现问题,现在的处理方式,go-nsq客户端会每隔一段时间执行一次重连操作。 如果对nsqd进行横向扩充,只能是自己额外的写一些代码调用ConnectToNSQDs或者ConnectToNSQD方法
- 去中心化连接方式 nsqlookupd 官方推荐使用连接nsqlookupd的方式,nsqlookupd用于做服务的注册和发现,这样可以做到去中心化。
# 4.6.1 去中心化实现原理
nsqlookupd用于管理整个网络拓扑结构,nsqd用它实现服务的注册,客户端使用他得到所有的nsqd服务节点信息。
实现原理如下:
- nsqd把自己的服务信息广播给一个或者多个nsqlookupd
- 客户端连接一个或者多个nsqlookupd,通过nsqlookupd得到所有的nsqd的连接信息,进行连接消费
- 如果某个nsqd出现问题,down机了,会和nsqlookupd断开,这样客户端从nsqlookupd得到的nsqd的列表永远是可用的。客户端连接的是所有的nsqd,一个出问题了就用其他的连接,所以也不会受影响。
# 4.7 消息的可靠性传输
# 4.7.1 NSQ服务器对消息的处理
在发送中的数据,存在的各种不确定性,nsq的处理方式是:对发送给客户端信息设置为在飞翔中,如果在如果处理成功就把这个消息从飞翔中的状态中去掉,如果在规定的时间内没有收到客户端的反馈,则认为这个消息超时,然后重新归队,两次进行处理。所以无论是哪种特殊情况,nsq统一认为消息为超时。 nsq启动的时候启动协程去处理channel的过期数据,在扫描channel的时候,如果发现有过期数据后,会重新放回到队列,进行重发操作。
以上操作即:
- 消费确认
- 过期重发
# 4.7.2 消费者丢失消息
在服务端发送消息给客户端后,如果在处理业务逻辑时,如果发生错误则给服务器发送Requeue命令告诉服务器,重新发送消息进处理。如果处理成功,则发送Finish命令
# 4.7.3 消息持久化
默认的情况下,只有内存队列不足时MemQueueSize:10000时,才会把数据保存到文件内进行持久到硬盘。 如果将 --mem-queue-size设置为 0,所有的消息将会存储到磁盘。我们不用担心消息会丢失,nsq 内部机制保证在程序关闭时将队列中的数据持久化到硬盘,重启后就会恢复。
NSQ在消息送到达NSQD(NSQ核心服务)服务后会先存储在内存中,当内存中消息累积到一定量后才会落到数据盘中。NSQD节点之间无法实现分布式协作,并且单点故障时会出现消息丢失。
# 4.8 消息备份/回放
但是由于消息是写到一个单一的 nsqd 的,如果那个 nsqd 部署机器挂掉无法恢复,可能会导致这个 nsqd 中积压的消息丢失。所以我们还需要设计消息备份和异常回放的机制。
# 4.8.1 备份
NSQ 提供了 nsq_to_file 工具,可以用来做消息备份。所有消息实时备份到本地文件,按小时切割文件,消息文件三备份,备份数据存储 n 天。 需要备份的消息使用 nsq_to_nsq 工具,同步写入备份队列,备份机器上运行 nsq_to_file 订阅备份队列的数据,写到磁盘备份。
# 4.8.2 回放
- 机器挂掉能重启恢复,重启恢复后重启服务即可,nsqd 的内存队列大小设置为 0,数据全部落盘,重启不会丢失数据。
- 机器不能重启恢复的情况下,从备份数据中回放该 nsqd 的备份消息。 需要注意回放时对消息去重,因为写备份时采用全部备份写成功才算成功的方案,可能会导致消息重复。
# 4.9 NSQ与Kafka差异
# 4.9.1 存储
- NSQ 默认是把消息放到内存中,只有当队列里消息的数量超过–mem-queue-size配置的限制时,才会对消息进行持久化。
- Kafka 会把写到磁盘中进行持久化,并通过顺序读写磁盘来保障性能。持久化能够让Kafka做更多的事情:消息的重新消费(重置offset);让数据更加安全,不那么容易丢失。同时Kafka还通过partition的机制,对消息做了备份,进一步增强了消息的安全性。
# 4.9.2 推拉模型
- NSQ 使用的是推模型,推模型能够使得时延非常小,消息到了马上就能够推送给下游消费,但是下游消费能够无法控制,推送过快可能导致下游过载。
- Kafka 使用的拉模型,拉模型能够让消费者自己掌握节奏,但是这样轮询会让整个消费的时延增加,不过消息队列本身对时延的要求不是很大,这一点影响不是很大。
# 4.9.3 消息的顺序性
- NSQ 因为不能够把特性消息和消费者对应起来,所以无法实现消息的有序性。
- Kafka 因为消息在Partition中写入是有序的,同时一个Partition只能够被一个Consumer消费,这样就可能实现消息在Partition中的有序。自定义写入哪个Partition的规则能够让需要有序消费的相关消息都进入同一个Partition中被消费,这样达到”全局有序“
# 5. MQ常见问题
# 5.1 MQ大量积压
所谓消息积压一般是由于消费端消费的速度远小于生产者发消息的速度,导致大量消息在 MQ 的队列中无法消费。 既然消费者速度跟不上生产者,那么提高消费者的速度!先定位消费慢的原因,如果是bug则处理 bug ,如果是因为本身消费能力较弱,有以下几种思路:
- 对生产者发消息接口进行适当限流(不太推荐,影响用户体验)
- 多部署几台消费者实例(推荐)
- 适当增加 prefetch 的数量(RabbitMQ),让消费端一次多接受一些消息(推荐,可以和第二种方案一起用)
// 设置Qos
if err := c.channel.Qos(1, 0, false); err != nil {
return nil, err
}
2
3
4
# 5.2 MQ消息过期失效
RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。