优质博文:IT-BLOG-CN

一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。Kafka的基本存储单位是分区。在配置Kafka的时候,管理员指定了一个用于存储分区的目录清单log.dirs参数的值。

一、分区分配

创建主题时,Kafka首先决定如何在broker之间分配分区。假设有6broker,打算创建一个包含10个分区的主题。并且复制系数是3,相当于30个分区副本。在被分配到6broker上时,要达到如下的目标:
【1】在broker间平均分配分区副本。对于上述例子来说,就是要保证每个broker可以分到5个副本。
【2】确保每个分区的每个副本分布在不同的broker上。
【3】如果为broker指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的broker上。这样做是为了保证一个机架不可用不会导致整个分区不可用。

为了实现这个目标,我们先随机选择一个broker(假设是2),然后通过轮询给每个broker分配分区来确定首领的位置。如果分区0的首领在broker2上,那么分区1的首领就在broker3上,以此类推。然后,从分区首领开始,以此分配跟随者副本。如分区0首领在broker2上,那么它的第一个副本会出现在broker3上,第二个出现在 broker4上。如果配置了机架信息,那么就不是按照数字顺序来选择broker了,而是按照交替机架的方式来选择broker。假设broker0broker1broker2放在同一个机架,broker3broker4broker5放在其他不同的机架。此时就不是按照05的顺序来选择broker,而是按照0,3,1,4,2,5的顺序进行选择的。

二、文件管理

保留数据时Kafka的一个基本特性,Kafka不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反,Kafka管理员为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。 因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若干个片段。默认情况下,index大小为10M,每个片段log包含1GB或一周数据,以较小的那个为准。当前正在写入数据的片段叫做活跃片段,活跃片段永远不会被删除。

三、文件格式

我们把Kafka的消息和偏移量保存在文件里。保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的。因为使用相同的消息格式进行磁盘存储和网络传输,Kafka可以使用零复制技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压缩。除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳。时间戳可以是生产者发送消息的时间,也可以是消息到达broker的时间,这个是可配置的。如果生产者发送的是压缩过的消息,那么同一个批次的消息会被再压缩一次,被当做包装消息进行发送。下面是普通消息和包装消息图:

四、文件存储机制

【1】Broker 消息中间件处理结点,一个Kafka节点就是一个Broker,多个Broker可以组成一个Kafka集群。
【2】Topic 主题,如page view日志、click日志等都可以以Topic的形式存在,Kafka集群能够同时负责多个Topic的分发。
【3】Partition Topic物理上的分组,一个Topic可以分为多个Partition,每个Partition是一个有序的队列。
【4】Segment Partition物理上由多个Segment组成。
【5】offset 每个 Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。Partition中的每个消息都有一个连续的序列号叫做offset,用于Partition唯一标识一条消息。

分析过程分为以下4个步骤:

【1】TopicPartition存储分布: 假设Kafka集群只有一个Brokerxxx/message-folder为数据文件存储根目录,在Kafka Brokerserver.properties文件配置(参数log.dirs=xxx/message-folder),例如创建2Topic名称分别为report_pushlaunch_infoPartitions数量都为partitions=4(将一个Topic分为4个部分存储)存储路径和目录规则为:

xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3

【2】Partiton中文件存储方式: 每个Partion(目录)相当于一个巨型文件被平均分配到多个大小相等Segment(段)数据文件中。但每个段Segment file消息数量不一定相等,这种特性方便old segment file快速被删除。每个Partiton只需要支持顺序读写就行了,Segment文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

【3】partitonsegment文件存储结构:segment file2大部分组成,分别为index filedata file,此2个文件成对出现,后缀".index"“.log”分别表示为segment索引文件、数据文件。segment文件命名规则:partion全局的第一个segment0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64long大小,19位数字字符长度,没有数字用0填充。下面文件列表是笔者在Kafka broker上做的一个实验,创建一个topicXXX包含1 partition,设置每个segment大小为500MB,并启动producerKafka broker写入大量数据,如下图所示segment文件列表形象说明了上述2个规则以及segmentindex<—->data file对应关系物理结构如下:

索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。 其中以索引文件中元数据3,497为例,依次在数据文件中表示第3message(在全局partiton表示第368772message)、以及该消息的物理偏移地址为497。从上述图3了解到segment data file由许多 message组成,下面详细说明message物理结构如下:

【参数说明】:
8 byte offset:在Parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移offset,它可以唯一确定每条消息在Parition内的位置。即offset表示Partiion的第多少message
4 byte message sizemessage大小;
4 byte CRC32:用crc32校验message
1 byte “magic":表示本次发布Kafka服务程序协议版本号;
1 byte “attributes":表示为独立版本、或标识压缩类型、或编码类型;
4 byte key length:表示key的长度,当key-1时,K byte key字段不填;
value bytes payload:表示实际消息数据;

`index文件结构:

offset: 783932 position: 69483992
offset: 784323 position: 69543233
offset: 784565 position: 69589443
offset: 784932 position: 69623433
offset: 785355 position: 69658994
offset: 785894 position: 69704355
offset: 786389 position: 69738993
offset: 786584 position: 69784345

log文件结构: 有个眼缘即可

offset: 784932 CreateTime:1598161852389 keysize: -1 valuesize: 15 sequence: 9884 baseOffset: 7043213 lastOffset: 784932 count: 1 baseSequence: 907

【4】在partition中如何通过offset查找message 例如读取offset=368776Message,需要通过下面2个步骤查找。
【第一步】查找segment file 上图为例,其中00000000000000000000.index表示最开始的文件,起始偏移量offset为0。第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1。同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset二分查找文件列表,就可以快速定位到具体文件。当offset=368776时定位到00000000000000368769.index|log
【第二步】通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index 的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。从上述图可知这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。通过上述过程详细分析,我们就可以清楚认识到kafka文件存储机制的奥秘。

五、Kafka文件实际运行效果

【实验环境】:Kafka集群 = 由2台虚拟机组成;CPU = 4核;物理内存= 8GB;网卡 = 千兆网卡;JVM HEAP = 4GB;详细Kafka服务端配置及其优化请参考:Kafka server.properties配置详解:

从上述图可以看出,Kafka运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message有如下特点:写message,消息从java堆转入page cache即物理内存。由异步线程刷盘,消息从page cache刷入磁盘。读message消息直接从page cache转入socket发送出去。当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁盘Load消息到page cache,然后直接从socket发出去。

六、Kafka 中的 Partition 和 Offset

【1】Log机制: 说到分区,就要说Kafka对消息的存储,首先,kafka是通过log(日志)来记录消息发布的。每当产生一个消息,Kafka会记录到本地的log文件中,这个log和我们平时的log有一定的区别。这里可以参考一下The Log,不多解释。这个log文件默认的位置在config/server.properties中指定的,默认的位置是log.dirs=/tmp/kafka-logsLinux不用说,windows的话就在你对应磁盘的根目录下。

分区Partition Kafka是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息数据库,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小。因此有了Partition的概念。Kafka对消息进行一定的计算,通过hash来进行分区。这样,就把一份log文件分成了多份。如上面的分区读写日志图,分成多份以后,在单台Broker上,比如快速上手中,如果新建Topic的时候,我们选择replication-factor 1 partitions 2,那么在log目录里,我们会看到test-0目录和test-1目录就是两个分区了。你可能会想,这没啥区别呀。注意,当有了多个broker之后,这个意义就存在了。这里上一张图:

【2】Kafka分布式分区存储: 这是一个Topic包含4Partition2 Replication(拷贝),也就是说全部的消息被放在了4个分区存储,为了高可用,将4个分区做了2份冗余,然后根据分配算法。将总共8份数据,分配到Broker集群上。结果就是每个Broker上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用。比如图中的Broker1,宕机了那么剩下的三台Broker依然保留了全量的分区数据。所以还能使用,如果再宕机一台,那么数据不完整了。当然你可以设置更多的冗余,比如设置了冗余是4,那么每台机器就有了0123完整的数据,宕机几台都行。需要在存储占用和高可用之间做衡量。至于宕机后,zookeeper会选出新的 partition leader

偏移offset 上一段说了分区,分区就是一个有序的,不可变的消息队列。新来的commit log持续往后面加数据。这些消息被分配了一个下标(或者偏移),就是offset,用来定位这一条消息。消费者消费到了哪条消息,是保持在消费者这一端的。消息者也可以控制,消费者可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset。也可以重置offset

如何通过offset算出分区:其实Partition存储的时候,又分成了多个segment(段),然后通过一个index索引,来标识第几段。这里先可以去看一下本地log目录的分区文件夹。在我这里,test-0这个分区里面,会有一个index文件和一个log文件,对于某个指定的分区,假设每5个消息作为一个段大小,当产生了10条消息的情况下,目前有会分段。
0.index(表示这里index是对0-4做的索引)、5.index (表示这里index是对5-9做的索引)、10.index (表示这里index是对10-15做的索引,目前还没满) 和0.log5.log10.log。当消费者需要读取offset=8的时候,首先kafkaindex文件列表进行二分查找,可以算出应该是在5.index对应的log文件中,然后对对应的5.log文件,进行顺序查找,5->6->7->8,直到顺序找到8就好了。

七、索引

消费者可以从Kafka的任意可用偏移量位置开始读取消息,假设消费者要读取从偏移量100开始的1MB消息,那么Broker必须立即定位到偏移量100,为了帮组broker更快地定位到指定的偏移量,Kafka为每个分区维护一个索引。索引把偏移量映射到片段文件和偏移量在文件里的位置。索引也被分成片段,所以再删除消息时,也可以删除相应的索引。Kafka不维护索引的校验和。如果索引出现损坏,Kafka会通过重新读取消息并录制偏移量和位置来重新生成索引。如果有必要,管理员是可以删除索引的,这样做是绝对安全的,Kafka会自动重新生成这些索引。

八、Kafka高效文件存储设计特点

【1】Kafkatopic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
【2】通过索引信息可以快速定位message和确定response的最大大小。
【3】通过index元数据全部映射到memory,可以避免segment fileIO磁盘操作。
【4】通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐