Kafka基础入门
Kafka是目前热门的分布式发布-订阅消息系统。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。
# 消息系统分类
# 点对点模式
点对点模式是最基础的消息模式,由单一服务器维持一个消息queue, 消息生成者生成消息发送(push/入列)到queue, 消息消费者从queue中取出(pull/出列)并消费消息。
点对点模式的消息系统常见的有:httpsqs/Redis实现队列。都有如下特点:
- 消息被消费后,消息queue中不再会有存储,所以消息消费者不可能消费到已经消费的消息。
- Queue支持多个消费者,但是对于一个消息而言,只有被一个消费者消费。
# 发布/订阅模式
消息发布者发布消息到topic, 所有消息消费(订阅)都会消费该消息。与点对点模式不同,发布/订阅更像一对多模式。
# Kafka基础概念
- topic:主题,同一类消息抽象的一个概念
- producer:发布者,负责发布消息到kafka集群/服务器
- consumer:订阅者,负责订阅消费kafka集群/服务器中的消息
- consumer group:订阅者组,同一个topic的消息可以被多个consumer订阅消费,这一类consumer就是consumer group
- broker:代理者,单个kafka服务器节点称为broker
- partition:分区,一个topic的所有消息数据,被分开存储在不同的地方,这个存储单位称为partition
# Kafka工作流程
- 生产者定期向主题发送消息。
- Kafka代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka将在第一分区中存储一个消息,在第二分区中存储第二消息。
- 消费者订阅特定主题。
- 一旦消费者订阅主题,Kafka将向消费者提供主题的当前偏移,并且还将偏移保存在Zookeeper系综中。
- 消费者将定期请求Kafka(如100 Ms)新消息。
- 一旦Kafka收到来自生产者的消息,它将这些消息转发给消费者。
- 消费者将收到消息并进行处理。
- 一旦消息被处理,消费者将向Kafka代理发送确认。
- 一旦Kafka收到确认,它将偏移更改为新值,并在Zookeeper中更新它。 由于偏移在Zookeeper中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间。
- 以上流程将重复,直到消费者停止请求。
- 消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。
# Topic
主题(topic)是Kafka消息发布的所在,kakfa为每个主题**(topic)维护了一个分区(partition)**的日志结构。每个主题的数据包含一个或多个分区(partiton),每个分区以一个提交日志文件(夹)的形式存在。
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。每条发给主题的消息都被分配一个在本分区内唯一的序号,这个序号被称为偏移量(offset)。
Kafka集群会保留所有被发布的消息一段时间,无论这些消息是否已经被消费,超过日志保留期(log retention)的消息就会被丢弃以释放空间。
同时,各分区的数据会按照的配置的量被复制到集群(cluster)上的其他服务器上。每个分区都有一个“领导(leader)"服务器和0到多个"追随者(follower)”服务器。领导服务器负责对这个分区的所有读写操作,而追随者服务器则被动的复制领导服务器。如果领导服务器宕机,其中一台追随者服务器会自动被选举为新领导。
# Partition 结构
Kafka一个topic下面的所有消息都是以partition的方式分布式的存储在多个节点上。同时在kafka的机器上,每个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段(segment)。Segment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命令规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下,假设有1000条消息,每个Segment大小为100,下面展现了900-1000的索引和Log:
由于kafka消息数据太大,如果全部建立索引,即占了空间又增加了耗时,所以kafka选择了稀疏索引的方式,这样的话索引可以直接进入内存,加快偏查询速度。
简单介绍一下如何读取数据,如果我们要读取第911条数据首先第一步,找到他是属于哪一段的,根据二分法查找到他属于的文件,找到0000900.index和00000900.log之后,然后去index中去查找 (911-900) =11这个索引或者小于11最近的索引,在这里通过二分法我们找到了索引是[10,1367]然后我们通过这条索引的物理位置1367,开始往后找,直到找到911条数据。
面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
1、 基于时间,默认配置是168小时(7天)。
2、 基于大小,默认配置是1073741824。
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!
主题日志分区有以下好处:
- 突破单一服务器处理能力的限制,允许数据任意增长;
- 实现并发性,因为数据是分散在不同的broker上,实现了类似的负载均衡。
- 因为分区的数据被复制到了集群其他机器上,实现了容错和容灾。
# Producer
消息生产者(producer)向主题(topic)发布消息,生产者自身要负责决定把消息发布到主题的具体哪个分区(partition)。分区的选择可以使简单的轮盘式,或者基于某种语义分区功能(如根据消息中的某键值来运算目标分区编号),大概如下:
- 如果没有Key值则进行轮询发送。
- 如果有Key值,对Key值进行Hash,然后对分区数量取余,保证了同一个Key值的会被路由到同一个分区,如果想队列的强顺序一致性,可以让所有的消息都设置为同一个Key。
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
- 1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
- 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
- -1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(其他节点都和zk断开连接,或者都没追上),这样就变成了acks=1的情况。
# Consumer
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据
# 消费模型
消息由生产者(producer)发送到kafka集群后,会被消费者(consumer)消费。一般来说我们的消费模型有两种:推送模型(psuh)和拉取模型(pull)。
基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。比如当我们把已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,如果我们在消费代理将其标记为已消费,这个消息就永久丢失了。如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种不可取。如果采用push,消息消费的速率就完全由消费代理控制,一旦消费者发生阻塞,就会出现问题。
Kafka采取拉取模型(poll),由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费。比如消费者可以消费已经消费过的消息进行重新处理,或者消费最近的消息等等。
# 消费者组(Consumer Group)
Kafka在发布/订阅模式之上,加入了组group的概念,每一个消费者consumer都属于一个消费组consumer group,每个组group可以有多个消费者consumer,每个消费者也可以加入多个消费组consumer group。发送到主题topic下的消息,会被订阅了这个主题topic的每个消费组consumer group消费。但是注意,一条消息只能被一个消费者组consumer group内的一个消费者consumer消费。 也就是说,假设所有的消费组consumer group都订阅了主题topic,如果所有的消费者consumer都在同一个消费组consumer group中, 那么是P2P模式,消息会在组内所有的消费者consumer之间负载均衡;相反,如果所有的消费者consumer都在自己单独的消费组consumer group中,那么每个消费者consumer都可以同时消费这个主题topic下的消息
同一消费者组的消费者可以使不同的进程、应用程序,甚至在不同的机器上。
在一个特定的消费组内,每个分区(partition)被且仅被分配到一个消费者示例上,也就是说每个分区对应一个消费者,那这个消费者可以严格按照顺序收到消息。该模式决定了每个消费组中的有效消费者数量一定小于或者等于主题包含的分区数量。
如此一来,Kafka仅在同一分区内保证了消息的顺序性,而不保证跨分区的消息顺序性。如果确实需要严格的总体顺序性,则主题只能包含一个分区。
# Partiton
Partition中的每条Message由offset来表示它在partition中的偏移量,但是这个offset不是该Message在partition数据文件中的实际存储值,而只是一个逻辑上的值,它唯一确定了partition中的一条Message, 因此,可以认为offset是patition中Message的id。
在Kafka中仅保存了每个consumer实例已经处理数据的offset, 这样就可以:
- 保存的数据量少
- 当consumer出错重新启动时,只需要从最近的offset开始处理数据即可。
每个Partition由多个segment组成,broker收到发布消息往对应的Partition的最后一个segment上添加数据,当某个segment上的消息条数达到配置值或者消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息,订阅者才能订阅消费到。segment达到一定大小后将不会再往该segment写输入,broker会创建新的segment。
图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议消费者组的consumer的数量与partition的数量一致!
1、 partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
2、 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
3、 如果既没指定partition,又没有设置key,则会轮询选出一个partition。
# 集群
集群部署最常见的有两种,一种是mysql的 主备 (master-slave),另一种是zookeeper的 领导者追随者 (leader-follower)。传统的主备系统可以同时对外提供服务,只不过master提供读写而slave只提供读。但是像zookeeper这种的却不一样, 只有leader对外提供服务 ,通俗一点说,follower只是用来等leader挂了,然后重新选举,有机会就自己当leader,没有机会就一直等着。等leader挂了, 重新选举,leader挂了,重新选举,如此循环(有的时候,不是你的,注定不是你的).....
这么来说,其实主备系统可能命还好一点,虽然没有改动权,但是还能对外提供服务(至少能露露脸)。
在Kafka中,这些备份下来的日志被称为 副本replica
为什么目前会更新leader和follower模式,follower只用作备份而不对外提供服务呢。因为对于秒级几十万的写入读取并发量,主从一致是很难做到的。对于出现已经将消息写入在了leader的分区partition中,但是没有来得及备份到follower的情况,如果A和B分别访问leader和follower的该分区partition,得到的是A消费了该数据,而B没有消费该数据的不同的结果,这样做不到消息获取的 幂等性 。
因此,新的leader-follower模式中,只有leader对外提供读写服务。leader会对设好的副本因子数对每一份分区partition都进行冗余备份,假设副本因子数为3,那么就会分别在三个不同的服务器broker中进行备份。 Kafka会保证同一个分区partition的多个replica一定不会分配在同一台服务器broker上 。如下图
# 持久化
Kafka对消息的存储和缓存依赖于文件系统,每次接收数据都会往磁盘上写,人们对于**“磁盘速度慢”**的普遍印象,使得人们对于持久化的架构能够提供强有力的性能产生怀疑。
事实上,磁盘的速度比人们预期的要慢的多,也快得多,这取决于人们使用磁盘的方式。而且设计合理的磁盘结构通常可以和网络一样快。
通过上图对比,我们可以看出实际上顺序磁盘访问在某些情况下比随机内存访问还要快,其实Kafka就是利用这一优势来实现高性能写磁盘
顺序写磁盘
另外还有非常关键的一点,Kafka在写数据的时候是以磁盘顺序写的方式来落盘的,也就是说,仅仅将数据追加到文件的末尾(append),而不是在文件的随机位置来修改数据。
对于普通的机械硬盘如果你要是随机写的话,确实性能极低,这里涉及到磁盘寻址的问题。但是如果只是追加文件末尾按照顺序的方式来写数据的话,那么这种磁盘顺序写的性能基本上可以跟写内存的性能本身是差不多的。
来总结一下: Kafka就是基于页缓存技术 + 磁盘顺序写 技术实现了写入数据的超高性能。
所以要保证每秒写入几万甚至几十万条数据的核心点,就是尽最大可能提升每条数据写入的性能,这样就可以在单位时间内写入更多的数据量,提升吞吐量。
# 零拷贝技术(zero-copy)
说完了写入这块,再来谈谈消费这块。
大家应该都知道,从Kafka里我们经常要消费数据,那么消费的时候实际上就是要从kafka的磁盘文件里读取某条数据然后发送给下游的消费者,如下图所示:
如果Kafka以上面这种方式从磁盘中读取数据发送给下游的消费者,大概过程是:
- 先看看要读的数据在不在os cache中,如果不在的话就从磁盘文件里读取数据后放入os cache
- 接着从操作系统的os cache 里拷贝数据到应用程序进程的缓存里,再从应用程序进程的缓存里拷贝数据到操作系统层面的Socket缓存里,最后从Soket缓存里提取数据后发送到网卡,最后发送出去给下游消费者
整个过程如下图:
从上图可以看出,这整个过程有两次没必要的拷贝
一次是从操作系统的cache里拷贝到应用进程的缓存里,接着又从应用程序缓存里拷贝回操作系统的Socket缓存里。
而且为了进行这两次拷贝,中间还发生了好几次上下文切换,一会儿是应用程序在执行,一会儿上下文切换到操作系统来执行。
所以这种方式来读取数据是比较消耗性能的。
Kafka 为了解决这个问题,在读数据的时候是引入零拷贝技术。
也就是说,直接让操作系统的cache中的数据发送到网卡后传出给下游的消费者,中间跳过了两次拷贝数据的步骤,Socket缓存中仅仅会拷贝一个描述符过去,不会拷贝数据到Socket缓存。
体会一下这个精妙的过程吧
通过零拷贝技术,就不需要把os cache里的数据拷贝到应用缓存,再从应用缓存拷贝到Socket缓存了,两次拷贝都省略了,所以叫做零拷贝。
对Socket缓存仅仅就是拷贝数据的描述符过去,然后数据就直接从os cache中发送到网卡上去了,这个过程大大的提升了数据消费时读取文件数据的性能。
而且大家会注意到,在从磁盘读数据的时候,会先看看os cache内存中是否有,如果有的话,其实读数据都是直接读内存的。
如果kafka集群经过良好的调优,大家会发现大量的数据都是直接写入os cache中,然后读数据的时候也是从os cache中读。
相当于是Kafka完全基于内存提供数据的写和读了,所以这个整体性能会极其的高。