今でもあなたは私の光丶

Kafka(6)物理存储

日志存储概述

Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。
每个主题又可以分为一个或多个分区。
每个分区各自存在一个记录消息数据的日志文件。

图中,创建了一个 tp_demo_01 主题,其存在6个 Parition,对应的每个Parition下存在一个 [Topic-Parition] 命名的消息日志文件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负 载均衡的效果。在分区日志文件中,你会发现很多类型的文件,比 如: .index、.timestamp、.log、.snapshot 等。

其中,文件名一致的文件集合就称为 LogSement。

LogSegment

  1. 分区日志文件中包含很多的 LogSegment
  2. Kafka 日志追加是顺序写入的
  3. LogSegment 可以减小日志文件的大小
  4. 进行日志删除的时候和数据查找的时候可以快速定位。
  5. ActiveLogSegment 是活跃的日志分段,拥有文件拥有写入权限,其余的 LogSegment 只有 只读的权限。

日志文件存在多种后缀文件,重点需要关注 .index、.timestamp、.log 三种类型。

类别作用

每个 LogSegment 都有一个基准偏移量,表示当前 LogSegment 中第一条消息的 offset。

偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志 文件都由该作为文件名命名规则(00000000000000000000.index、 00000000000000000000.timestamp、00000000000000000000.log)。
如果日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121(偏移量从 0 开始)。

日志与索引文件

配置项默认值说明

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。
时间戳索引文件则根据时间戳查找对应的偏移量。
Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,并不保证每一个消息在索引文件中都有 对应的索引项。
每当写入一定量的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引项和时间戳 索引项。
通过修改 log.index.interval.bytes 的值,改变索引项的密度。

切分文件

当满足如下几个条件中的其中之一,就会触发文件的切分:

  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。 log.segment.bytes 参数的默认值为 1073741824,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.ms log.roll.hours 参数配置的值。如果同时配置了 log.roll.mslog.roll.hours 参 数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值 为168,即 7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。 log.index.size.max.bytes 的默认值为 10485760,即 10MB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE ,即要追 加的消息的偏移量不能转变为相对偏移量。

为什么是 Integer.MAX_VALUE

1024 * 1024 * 1024=1073741824
在偏移量索引文件中,每个索引项共占用 8 个字节,并分为两部分。
相对偏移量和物理地址。
相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
4 个字节刚好对应 Integer.MAX_VALUE ,如果大于 Integer.MAX_VALUE ,则不能用 4 个字节 进行表示了。

索引文件切分过程

索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件创建的时候就是最大 值
当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。
这一点是跟日志文件有所区别的地方。其意义降低了代码逻辑的复杂性。

日志存储

索引

偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查 找对应的偏移量

文件:

查看一个topic分区目录下的内容,发现有log、index和timeindex三个文件:

  1. log文件名是以文件中第一条message的offset来命名的,实际offset长度是64位,但是这里 只使用了20位,应付生产是足够的。
  2. 一组index+log+timeindex文件的名字是一样的,并且log文件默认写满1G后,会进行log rolling形成一个新的组合来记录消息,这个是通过broker端 log.segment.bytes =1073741824指定的。
  3. index和timeindex在刚使用时会分配10M的大小,当进行 log rolling 后,它会修剪为实际 的大小。

1、创建主题:

[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --
topic tp_demo_05 --partitions 1 --replication-factor 1 --config
segment.bytes=104857600

2、创建消息文件:

[root@node1 ~]# for i in `seq 10000000`; do echo "hello lagou $i" >> nmm.txt;
done

3、将文本消息生产到主题中:

[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic
tp_demo_05 <nmm.txt

4、查看存储文件:

如果想查看这些文件,可以使用kafka提供的shell来完成,几个关键信息如下:

  1. offset是逐渐增加的整数,每个offset对应一个消息的偏移量。
  2. position:消息批字节数,用于计算物理地址。
  3. CreateTime:时间戳。
  4. magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。
  5. compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1- GZIP、2-snappy、3-lz4。
  6. crc:对所有字段进行校验后的crc值。
[root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments -
-files 00000000000000000000.log --print-data-log | head
Dumping 00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 716 baseSequence: -1 lastSequence: -1 producerId:
-1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false
position: 0 CreateTime: 1596513421661 isvalid: true size: 16380 magic: 2
compresscodec: NONE crc: 2973274901
baseOffset: 717 lastOffset: 1410 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 16380 CreateTime: 1596513421715 isvalid: true size: 16371
magic: 2 compresscodec: NONE crc: 1439993110
baseOffset: 1411 lastOffset: 2092 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 32751 CreateTime: 1596513421747 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 3528903590
baseOffset: 2093 lastOffset: 2774 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 49116 CreateTime: 1596513421791 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 763876977
baseOffset: 2775 lastOffset: 3456 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 65481 CreateTime: 1596513421795 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 2218198476
baseOffset: 3457 lastOffset: 4138 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 81846 CreateTime: 1596513421798 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 4018065070
baseOffset: 4139 lastOffset: 4820 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 98211 CreateTime: 1596513421802 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 3073882858
baseOffset: 4821 lastOffset: 5502 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 114576 CreateTime: 1596513421819 isvalid: true size: 16365
magic: 2 compresscodec: NONE crc: 207330377
[root@node1 tp_demo_05-0]#

关于消息偏移量:

一、消息存储

  1. 消息内容保存在log日志文件中。
  2. 消息封装为Record,追加到log日志文件末尾,采用的是顺序写模式。
  3. 一个topic的不同分区,可认为是queue,顺序写入接收到的消息。

消费者有offset。下图中,消费者A消费的offset是9,消费者B消费的offset是11,不同的消费者 offset是交给一个内部公共topic来记录的。

(3)时间戳索引文件,它的作用是可以让用户查询某个时间段内的消息,它一条数据的结构是时 间戳(8byte)+相对offset(4byte),如果要使用这个索引文件,首先需要通过时间范围,找到对应 的相对offset,然后再去对应的index文件找到position信息,然后才能遍历log文件,它也是需要使用 上面说的index文件的。

但是由于producer生产消息可以指定消息的时间戳,这可能将导致消息的时间戳不一定有先后顺 序,因此尽量不要生产消息时指定时间戳。

偏移量

  1. 位置索引保存在index文件中
  2. og日志默认每写入4K(log.index.interval.bytes设定的),会写入一条索引信息到index文件 中,因此索引文件是稀疏索引,它不会为每条日志都建立索引信息。
  3. log文件中的日志,是顺序写入的,由message+实际offset+position组成
  4. 索引文件的数据结构则是由相对offset(4byte)+position(4byte)组成,由于保存的是相 对第一个消息的相对offset,只需要4byte就可以了,可以节省空间,在实际查找后还需要计 算回实际的offset,这对用户是透明的。

稀疏索引,索引密度不高,但是offset有序,二分查找的时间复杂度为O(lgN),如果从头遍历时间 复杂度是O(N)。

示意图如下:

偏移量索引由相对偏移量和物理地址组成。

可以通过如下命令解析 .index 文件

kafka-run-class.sh kafka.tools.DumpLogSegments --files
00000000000000000000.index --print-data-log | head

注意:offset 与 position 没有直接关系,因为会删除数据和清理日志。

[root@node1 tp_demo_05-0]# kafka-run-class.sh kafka.tools.DumpLogSegments -
-files 00000000000003925423.log --print-data-log | head
Dumping 00000000000003925423.log
Starting offset: 3925423
baseOffset: 3925423 lastOffset: 3926028 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 0 CreateTime: 1596513434779 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 4049330741
baseOffset: 3926029 lastOffset: 3926634 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 16359 CreateTime: 1596513434786 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 2290699169
baseOffset: 3926635 lastOffset: 3927240 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 32718 CreateTime: 1596513434787 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 368995405
baseOffset: 3927241 lastOffset: 3927846 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 49077 CreateTime: 1596513434788 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 143415655
baseOffset: 3927847 lastOffset: 3928452 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 65436 CreateTime: 1596513434789 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 572340120
baseOffset: 3928453 lastOffset: 3929058 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 81795 CreateTime: 1596513434790 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 1029643347
baseOffset: 3929059 lastOffset: 3929664 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 98154 CreateTime: 1596513434791 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 2163818250
baseOffset: 3929665 lastOffset: 3930270 baseSequence: -1 lastSequence: -1
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional:
false position: 114513 CreateTime: 1596513434792 isvalid: true size: 16359
magic: 2 compresscodec: NONE crc: 3747213735
[root@node1 tp_demo_05-0]#

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳 必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息元数据中存在若干的时 间戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间 戳必定能保持单调增长。反之如果是 CreateTime 则无法保证顺序。

注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的。因为数据的 写入是各自追加。

思考:如何查看偏移量为23的消息?

Kafka 中存在一个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在 00000000000000000000.index ,通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即 offset 20 那栏,然后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。

时间戳

在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳 必须大于之前追加的索引项,否则不予追加。在 Kafka 0.11.0.0 以后,消息信息中存在若干的时间 戳信息。如果 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳 必定能保持单调增长。反之如果是 CreateTime 则无法保证顺序。

通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。
时间戳索引索引格式:前八个字节表示时间戳,后四个字节表示偏移量

思考:查找时间戳为 1557554753430 开始的消息?

  1. 查找该时间戳应该在哪个日志分段中。将1557554753430和每个日志分段中最大时间戳 largestTimeStamp逐一对比,直到找到不小于1557554753430所对应的日志分段。日志分段 中的largestTimeStamp的计算是:先查询该日志分段所对应时间戳索引文件,找到最后一条 索引项,若最后一条索引项的时间戳字段值大于0,则取该值,否则取该日志分段的最近修改 时间。
  2. 查找该日志分段的偏移量索引文件,查找该偏移量对应的物理地址。
  3. 日志文件中从 320 的物理位置开始查找不小于 1557554753430 数据。

注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的,因为数据的 写入是各自追加。

清理

Kafka 提供两种日志清理策略:

  • 日志删除:按照一定的删除策略,将不满足条件的数据进行数据删除
  • 日志压缩:针对每个消息的 Key 进行整合,对于有相同 Key 的不同 Value 值,只保留最后一个版 本。

Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值: delete ,还可以选择 compact

主题级别的配置项是 cleanup.policy

日志删除

基于时间

日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设 定日志保留的时间节点。如果超过该设定值,就需要进行删除。默认是 7 天, log.retention.ms 优 先级最高。

Kafka 依据日志分段中最大的时间戳进行定位。
首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后 一条索引项的时间戳字段值大于 0,则取该值,否则取最近修改时间。

为什么不直接选最近修改时间呢?
因为日志文件可以有意无意的被修改,并不能真实的反应日志分段的最大时间信息。

删除过程

  1. 从日志对象中所维护日志分段的跳跃表中移除待删除的日志分段,保证没有线程对这些日志分 段进行读取操作。
  2. 这些日志分段所有文件添加 上 .delete 后缀。
  3. 交由一个以 "delete-file" 命名的延迟任务来删除这些 .delete 为后缀的文件。延迟执行 时间可以通过 file.delete.delay.ms 进行设置

如果活跃的日志分段中也存在需要删除的数据时?

Kafka 会先切分出一个新的日志分段作为活跃日志分段,该日志分段不删除,删除原来的日志分 段。
先腾出地方,再删除。

基于日志大小

日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日 志分段的大小由 log.segment.bytes 进行设定。

删除过程

  1. 计算需要被删除的日志总大小 (当前日志文件大小(所有分段)减去retention值)。
  2. 从日志文件第一个 LogSegment 开始查找可删除的日志分段的文件集合。
  3. 执行删除。

基于偏移量

根据日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,若是,则可以 删除此日志分段。
注意:日志文件的起始偏移量并不一定等于第一个日志分段的基准偏移量,存在数据删除,可能与 之相等的那条数据已经被删除了。

删除过程

  1. 从头开始遍历每个日志分段,日志分段1的下一个日志分段的起始偏移量为21,小于 logStartOffset,将日志分段1加入到删除队列中
  2. 日志分段 2 的下一个日志分段的起始偏移量为35,小于 logStartOffset,将 日志分段 2 加入 到删除队列中
  3. 日志分段 3 的下一个日志分段的起始偏移量为57,小于logStartOffset,将日志分段3加入删 除集合中
  4. 日志分段4的下一个日志分段的其实偏移量为71,大于logStartOffset,则不进行删除。

日志压缩策略

1. 概念

日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留,而不是基于粗粒度的基于时间的 保留。
对于具有相同的Key,而数据不同,只保留最后一条数据,前面的数据在合适的情况下删除。

2. 应用场景

日志压缩特性,就实时计算来说,可以在异常容灾方面有很好的应用途径。比如,我们在Spark、 Flink中做实时计算时,需要长期在内存里面维护一些数据,这些数据可能是通过聚合了一天或者一周的 日志得到的,这些数据一旦由于异常因素(内存、网络、磁盘等)崩溃了,从头开始计算需要很长的时 间。一个比较有效可行的方式就是定时将内存里的数据备份到外部存储介质中,当崩溃出现时,再从外 部存储介质中恢复并继续计算。
使用日志压缩来替代这些外部存储有哪些优势及好处呢?这里为大家列举并总结了几点:

  • Kafka即是数据源又是存储工具,可以简化技术栈,降低维护成本
  • 使用外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使用这些Key将数据取 回,实现起来有一定的工程难度和复杂度。使用Kafka的日志压缩特性,只需要把数据写进 Kafka,等异常出现恢复任务时再读回到内存就可以了
  • Kafka对于磁盘的读写做了大量的优化工作,比如磁盘顺序读写。相对于外部存储介质没有索 引查询等工作量的负担,可以实现高性能。同时,Kafka的日志压缩机制可以充分利用廉价的 磁盘,不用依赖昂贵的内存来处理,在性能相似的情况下,实现非常高的性价比(这个观点仅 仅针对于异常处理和容灾的场景来说)

2.3 日志压缩方式的实现细节

主题的 cleanup.policy 需要设置为compact。

Kafka的后台线程会定时将Topic遍历两次:

  1. 记录每个key的hash值最后一次出现的偏移量
  2. 第二次检查每个offset对应的Key是否在后面的日志中出现过,如果出现了就删除对应的日 志。

日志压缩允许删除,除最后一个key之外,删除先前出现的所有该key对应的记录。在一段时间后从 日志中清理,以释放空间。

注意:日志压缩与key有关,确保每个消息的key不为null。

压缩是在Kafka后台通过定时重新打开Segment来完成的,Segment的压缩细节如下图所示:

日志压缩可以确保:

任何保持在日志头部以内的使用者都将看到所写的每条消息,这些消息将具有顺序偏移量。可 以使用Topic的min.compaction.lag.ms属性来保证消息在被压缩之前必须经过的最短时间。 也就是说,它为每个消息在(未压缩)头部停留的时间提供了一个下限。可以使用Topic的 max.compaction.lag.ms属性来保证从收到消息到消息符合压缩条件之间的最大延时

  • 消息始终保持顺序,压缩永远不会重新排序消息,只是删除一些而已
  • 消息的偏移量永远不会改变,它是日志中位置的永久标识符
  • 从日志开始的任何使用者将至少看到所有记录的最终状态,按记录的顺序写入。另外,如果使 用者在比Topic的log.cleaner.delete.retention.ms短的时间内到达日志的头部,则会看到已 删除记录的所有delete标记。保留时间默认是24小时。

默认情况下,启动日志清理器,若需要启动特定Topic的日志清理,请添加特定的属性。配置日志 清理器,这里为大家总结了以下几点:

  1. log.cleanup.policy 设置为 compact ,Broker的配置,影响集群中所有的Topic
  2. log.cleaner.min.compaction.lag.ms ,用于防止对更新超过最小消息进行压缩,如果没 有设置,除最后一个Segment之外,所有Segment都有资格进行压缩
    • log.cleaner.max.compaction.lag.ms ,用于防止低生产速率的日志在无限制的时间内不 压缩。

Kafka的日志压缩原理并不复杂,就是定时把所有的日志读取两遍,写一遍,而CPU的速度超过磁 盘完全不是问题,只要日志的量对应的读取两遍和写入一遍的时间在可接受的范围内,那么它的性能就 是可以接受的。

磁盘存储

零拷贝

kafka高性能,是多方面协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“无 所不用其极”的高效利用磁盘/操作系统特性。
零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数。通常是说在IO读写过程中。
nginx的高性能也有零拷贝的身影。

传统IO

比如:读取文件,socket发送
传统方式实现:先读取、再发送,实际经过1~4四次copy。

buffer = File.read
Socket.send(buffer)
  1. 第一次:将磁盘文件,读取到操作系统内核缓冲区;
  2. 第二次:将内核缓冲区的数据,copy到application应用程序的buffer;
  3. 第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统 内核的缓冲区);
  4. 第四次:将socket buffer的数据,copy到网络协议栈,由网卡进行网络传输。

实际IO读写,需要进行IO中断,需要CPU响应中断(内核态到用户态转换),尽管引入DMA(Direct Memory Access,直接存储器访问)来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的
实际上并不需要第二个和第三个数据副本。数据可以直接从读缓冲区传输到套接字缓冲区。

kafka的两个过程:

  1. 网络数据持久化到磁盘 (Producer 到 Broker)
  2. 磁盘文件通过网络发送(Broker 到 Consumer)

数据落盘通常都是非实时的,Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分 页存储来利用内存提高I/O效率。

磁盘文件通过网络发送(Broker 到 Consumer)

磁盘数据通过DMA(Direct Memory Access,直接存储器访问)拷贝到内核态 Buffer
直接通过 DMA 拷贝到 NIC Buffer(socket buffer),无需 CPU 拷贝。
除了减少数据拷贝外,整个读文件 ==> 网络发送由一个 sendfile 调用完成,整个过程只有两次上 下文切换,因此大大提高了性能。

Java NIO对sendfile的支持就是FileChannel.transferTo()/transferFrom()。
fileChannel.transferTo( position, count, socketChannel);
把磁盘文件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是 sendfile。消费者从broker读取数据,就是由此实现。

具体来看,Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 通过 Java NIO 的 FileChannel 的 transferTo 和 transferFrom 方法实现零拷贝。

注: transferTo 和 transferFrom 并不保证一定能使用零拷贝,需要操作系统支持。
Linux 2.4+ 内核通过 sendfile 系统调用,提供了零拷贝。

页缓存

页缓存是操作系统实现的一种主要的磁盘缓存,以此用来减少对磁盘 I/O 的操作。
具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。

Kafka接收来自socket buffer的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使 用mmap内存文件映射。
Memory Mapped Files
简称mmap,简单描述其作用就是:将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文 件。
它的工作原理是直接利用操作系统的Page来实现磁盘文件到物理内存的直接映射。完成映射之后你 对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存)。使用这种方式可以获取很大的 I/O提升,省去了用户空间到内核空间复制的开销。

mmap也有一个很明显的缺陷:不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统 会在程序主动调用flush的时候才把数据真正的写到硬盘。
Kafka提供了一个参数 producer.type 来控制是不是主动flush;
如果Kafka写入到mmap之后就立即flush然后再返回Producer叫同步(sync);
写入mmap之后立即返回Producer不调用flush叫异步(async)。

Java NIO对文件映射的支持
Java NIO,提供了一个MappedByteBuffer 类可以用来实现内存映射。
MappedByteBuffer只能通过调用FileChannel的map()取得,再没有其他方式。
FileChannel.map()是抽象方法,具体实现是在 FileChannelImpl.map()可自行查看JDK源码,其 map0()方法就是调用了Linux内核的mmap的API。

使用 MappedByteBuffer类要注意的是

  • mmap的文件映射,在full gc时才会进行释放。当close时,需要手动清除内存映射文件,可 以反射调用sun.misc.Cleaner方法。

当一个进程准备读取磁盘上的文件内容时:

  1. 操作系统会先查看待读取的数据所在的页 (page)是否在页缓存(pagecache)中,如果存在(命 中)则直接返回数据,从而避免了对物理磁盘的 I/O 操作;
  2. 如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数 据返回给进程。

如果一个进程需要将数据写入磁盘:

  1. 操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的 页,最后将数据写入对应的页。
  2. 被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持 数据的一致性。

对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的 页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用Direct I/O的方式, 否则页缓存很难 被禁止。
当使用页缓存的时候,即使Kafka服务重启, 页缓存还是会保持有效,然而进程内的缓存却需要重 建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会 比进程内维护更加安全有效。

Kafka中大量使用了页缓存,这是 Kafka 实现高吞吐的重要因素之一。
消息先被写入页缓存,由操作系统负责刷盘任务。

顺序写入

操作系统可以针对线性读写做深层次的优化,比如预读(read-ahead,提前将一个比较大的磁盘块 读入内存) 和后写(write-behind,将很多小的逻辑写操作合并起来组成一个大的物理写操作)技术。

Kafka 在设计时采用了文件追加的方式来写入消息,即只能在日志文件的尾部追加新的消 息,并且 也不允许修改已写入的消息,这种方式属于典型的顺序写盘的操作,所以就算 Kafka 使用磁盘作为存储 介质,也能承载非常大的吞吐量。

mmap和sendfile:

  1. Linux内核提供、实现零拷贝的API;
  2. sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
  3. mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
  4. RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。

Kafka速度快是因为:

  1. partition顺序读写,充分利用磁盘特性,这是基础;
  2. Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
  3. Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到 socket buffer进行网络发送。