今でもあなたは私の光丶

RocketMQ(2)RocketMQ高级特性及原理(一)

消费发送

生产者向消息队列里写入消息,不同的业务场景需要生产者采用不同的写入策略。比如同步发送、 异步发送、Oneway发送、延迟发送、发送事务消息等。 默认使用的是DefaultMQProducer类,发送 消息要经过五个步骤:

1)设置Producer的GroupName。

2)设置InstanceName,当一个Jvm需要启动多个Producer的时候,通过设置不同的 InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。

3)设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证 不丢消息,可以设置多重试几次

4)设置NameServer地址

5)组装消息并发送。

消息发生返回状态(SendResult#SendStatus)有如下四种:
FLUSH_DISK_TIMEOUT
FLUSH_SLAVE_TIMEOUT
SLAVE_NOT_AVAILABLE
SEND_OK

不同状态在不同的刷盘策略和同步策略的配置下含义是不同的:

  1. FLUSH_DISK_TIMEOUT:表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成 SYNC_FLUSH才会报这个错误)。
  2. FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式, 没有在设定时间内完成主从同步。
  3. SLAVE_NOT_AVAILABLE:这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主 备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
  4. SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是 否被同步到了Slave上?消息在Slave上是否被写入磁盘?需要结合所配置的刷盘策略、主从策 略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。

写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理 逻辑。

提升写入的性能

发送一条消息出去要经过三步

  1. 客户端发送请求到服务器。
  2. 服务器处理该请求。
  3. 服务器向客户端返回应答

一次消息的发送耗时是上述三个步骤的总和。

在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用, 可以采用Oneway方 式发送
Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返 回结果。
用这种方式发送消息的耗时可以缩短到微秒级。

另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,
我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗 口内消息可以并发地写入DirectMem中,然后异步地将连续一段无空洞的数据刷入文件系统当中。
顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能。

目前在阿里内部经过调优的服务器上,写入性能达到90万+的TPS,我们可以参考这个数据进行系 统优化。
在Linux操作系统层级进行调优,推荐使用EXT4文件系统,IO调度算法使用deadline算法。

消息消费

简单总结消费的几个要点:

  1. 消息消费方式(Pull和Push)
  2. 消息消费的模式(广播模式和集群模式)
  3. 流量控制(可以结合sentinel来实现,后面单独讲)
  4. 并发线程数设置
  5. 消息的过滤(Tag、Key) TagA||TagB||TagC * null

当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看 消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力。

  1. 提高消费并行度
    在同一个ConsumerGroup下(Clustering方式),可以通过增加Consumer实例的数量来提 高并行度。
    通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。 注意:总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收 不到消息。
    此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加 并行度来提高吞吐量(设置方法是修改consumeThreadMin和consumeThreadMax)。
  2. 以批量方式进行消费
    某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中 涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。
    可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的 consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次 收到的是个长度为N的消息链表。
  3. 检测延时情况,跳过非重要消息
    Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆 积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度。

消息存储

存储介质

关系型数据库DB

Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式 来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如 Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方 案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障

文件系统

目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟 机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消 息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘 挂了,否则一般是不会出现无法持久化的故障问题。

性能对比

文件系统>关系型数据库DB

消息的存储和发送

1) 消息存储

目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。
但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!
因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。
RocketMQ的消息用顺序写,保证了消息存储的速度。

2) 存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件 是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储 的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

消息存储架构图中主要有下面三个跟消息存储相关的文件构成。

(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消 息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移 量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏 移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文 件;

(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能
RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行
如果要遍历commitlog文件根据topic检索消息是非常低效。
Consumer即可根据ConsumeQueue来查找待消费的消息。
其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引:

  1. 保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset
  2. 消息大小size
  3. 消息Tag的HashCode值。

consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组 织方式如下:

topic/queue/file三层组织结构
具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

consumequeue文件采取定长设计,每个条目共20个字节,分别为:

  1. 8字节的commitlog物理偏移量
  2. 4字节的消息长度
  3. 8字节tag hashcode

单个文件由30W个条目组成,可以像数组一样随机访问每一个条目
每个ConsumeQueue文件大小约5.72M;
(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。

  1. Index文件的存储位置是: $HOME/store/index/${fileName}
  2. 文件名fileName是以创建时的时间戳命名的
  3. 固定的单个IndexFile文件大小约为400M
  4. 一个IndexFile可以保存 2000W个索引
  5. ndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底 层实现为hash索引。

过滤消息

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时 再做消息过滤的。

RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实 现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然 后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。

其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基 于Tag的消息过滤正式基于这个字段值的。

主要支持如下2种的过滤方式

(1) Tag过滤方式:Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多 个TAG,可以用||分隔。

  1. Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给 Broker端。
  2. Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个 MessageFilter,然后传给Store。
  3. Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤
  4. 在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,在消息消费端拉 取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消 息消费。

(2) SQL92的过滤方式:

仅对push的消费者起作用。
Tag方式虽然效率高,但是支持的过滤逻辑比较简单。
SQL表达式可以更加灵活的支持复杂过滤逻辑,这种方式的大致做法和上面的Tag过滤方式一样, 只是在Store层的具体过滤过程不太一样
真正的 SQL expression 的构建和执行由rocketmq-filter模块负责的。
每次过滤都去执行SQL表达式会影响效率,所以RocketMQ使用了BloomFilter避免了每次都去执 行。
SQL92的表达式上下文为消息的属性。

conf/broker.conf

首先需要开启支持SQL92的特性,然后重启broker:

mqbroker -n localhost:9876 -c /opt/rocket/conf/broker.conf

RocketMQ仅定义了几种基本的语法,用户可以扩展:

  1. 数字比较: >, >=, <, <=, BETWEEN, =
  2. 字符串比较: =, <>, IN; IS NULL或者IS NOT NULL;
  3. 逻辑比较: AND, OR, NOT;
  4. Constant types are: 数字如:123, 3.1415; 字符串如:'abc',必须是单引号引起来 NULL,特 殊常量 布尔型如:TRUE or FALSE;

(3) Filter Server方式。这是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据 Java函数的逻辑对消息进行过滤。

要使用Filter Server,首先要在启动Broker前在配置文件里加上 filterServer-Nums=3 这样的配 置,Broker在启动的时候,就会在本机启动3个Filter Server进程。Filter Server类似一个RocketMQ的 Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消 息再传给远端的Consumer。

这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过 检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。

零拷贝原理

PageCache

  • 由内存中的物理page组成,其内容对应磁盘上的block。
  • page cache的大小是动态变化的。
  • backing store: cache缓存的存储设备
  • 一个page通常包含多个block, 而block不一定是连续的。

读Cache

  • 当内核发起一个读请求时, 先会检查请求的数据是否缓存到了page cache中。
    • 如果有,那么直接从内存中读取,不需要访问磁盘, 此即 cache hit(缓存命中)
    • 如果没有, 就必须从磁盘中读取数据, 然后内核将读取的数据再缓存到cache中, 如此 后续的读请求就可以命中缓存了
  • page可以只缓存一个文件的部分内容, 而不需要把整个文件都缓存进来。

写Cache

  • 当内核发起一个写请求时, 也是直接往cache中写入, 后备存储中的内容不会直接更新。
  • 内核会将被写入的page标记为dirty, 并将其加入到dirty list中。
  • 内核会周期性地将dirty list中的page写回到磁盘上, 从而使磁盘上的数据和内存中缓存的数据 一致。

cache回收

  • Page cache的另一个重要工作是释放page, 从而释放内存空间。
  • cache回收的任务是选择合适的page释放
    • 如果page是dirty的, 需要将page写回到磁盘中再释放。

cache和buffer的区别

1、 Cache:缓存区,是高速缓存,是位于CPU和主内存之间的容量较小但速度很快的存储器,因 为CPU的速度远远高于主内存的速度,CPU从内存中读取数据需等待很长的时间,而 Cache 保存着CPU刚用过的数据或循环使用的部分数据,这时从Cache中读取数据会更快,减少了 CPU等待的时间,提高了系统的性能。

Cache并不是缓存文件的,而是缓存块的(块是I/O读写最小的单元);Cache一般会用在I/O请求上, 如果多个进程要访问某个文件,可以把此文件读入Cache中,这样下一个进程获取CPU控制权并访问此 文件直接从Cache读取,提高系统性能

2、 Buffer:缓冲区,用于存储速度不同步的设备或优先级不同的设备之间传输数据;通过buffer 可以减少进程间通信需要等待的时间,当存储速度快的设备与存储速度慢的设备进行通信时, 存储慢的数据先把数据存放到buffer,达到一定程度存储快的设备再读取buffer的数据,在此 期间存储快的设备CPU可以干其他的事情。

Buffer:一般是用在写入磁盘的,例如:某个进程要求多个字段被读入,当所有要求的字段被读入 之前已经读入的字段会先放到buffer中。

HeapByteBuffer和DirectByteBuffer

HeapByteBuffer,是在jvm堆上面一个buffer,底层的本质是一个数组,用类封装维护了很多的 索引(limit/position/capacity等)。

DirectByteBuffer,底层的数据是维护在操作系统的内存中,而不是jvm里,DirectByteBuffer里维 护了一个引用address指向数据,进而操作数据。

HeapByteBuffer优点:内容维护在jvm里,把内容写进buffer里速度快;更容易回收

DirectByteBuffer优点:跟外设(IO设备)打交道时会快很多,因为外设读取jvm堆里的数据时, 不是直接读取的,而是把jvm里的数据读到一个内存块里,再在这个块里读取的,如果使用 DirectByteBuffer,则可以省去这一步,实现zero copy(零拷贝)

外设之所以要把jvm堆里的数据copy出来再操作,不是因为操作系统不能直接操作jvm内存,而是 因为jvm在进行gc(垃圾回收)时,会对数据进行移动,一旦出现这种问题,外设就会出现数据错乱的 情况

所有的通过allocate方法创建的buffer都是HeapByteBuffer.

堆外内存实现零拷贝

  1. 前者分配在JVM堆上(ByteBuffer.allocate()),后者分配在操作系统物理内存上 (ByteBuffer.allocateDirect(),JVM使用C库中的malloc()方法分配堆外内存);
  2. DirectByteBuffer可以减少JVM GC压力,当然,堆中依然保存对象引用,fullgc发生时也会回 收直接内存,也可以通过system.gc主动通知JVM回收,或者通过 cleaner.clean主动清理。 Cleaner.create()方法需要传入一个DirectByteBuffer对象和一个Deallocator(一个堆外内存 回收线程)。GC发生时发现堆中的DirectByteBuffer对象没有强引用了,则调用Deallocator 的run()方法回收直接内存,并释放堆中DirectByteBuffer的对象引用;
  3. 底层I/O操作需要连续的内存(JVM堆内存容易发生GC和对象移动),所以在执行write操作时 需要将HeapByteBuffer数据拷贝到一个临时的(操作系统用户态)内存空间中,会多一次额外 拷贝。而DirectByteBuffer则可以省去这个拷贝动作,这是Java层面的 “零拷贝” 技术,在 netty中广泛使用;
  4. MappedByteBuffer底层使用了操作系统的mmap机制,FileChannel#map()方法就会返回 MappedByteBuffer。DirectByteBuffer虽然实现了MappedByteBuffer,不过 DirectByteBuffer默认并没有直接使用mmap机制。

缓冲IO和直接IO

缓存IO

缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制 中,数据先从磁盘复制到内核空间的缓冲区,然后从内核空间缓冲区复制到应用程序的地址空间。
读操作:操作系统检查内核的缓冲区有没有需要的数据,如果已经缓存了,那么就直接从缓存中返 回;否则从磁盘中读取,然后缓存在操作系统的缓存中。
写操作:将数据从用户空间复制到内核空间的缓存中。这时对用户程序来说写操作就已经完成,至 于什么时候再写到磁盘中由操作系统决定,除非显示地调用了sync同步命令。

缓存I/O的优点:

  1. 在一定程度上分离了内核空间和用户空间,保护系统本身的运行安全;
  2. 可以减少读盘的次数,从而提高性能。

缓存I/O的缺点:

在缓存 I/O 机制中,DMA 方式可以将数据直接从磁盘读到页缓存中,或者将数据从页缓存直 接写回到磁盘上,而不能直接在应用程序地址空间和磁盘之间进行数据传输。数据在传输过程 中就需要在应用程序地址空间(用户空间)和缓存(内核空间)之间进行多次数据拷贝操作, 这些数据拷贝操作所带来的CPU以及内存开销是非常大的。

直接IO

直接IO就是应用程序直接访问磁盘数据,而不经过内核缓冲区,这样做的目的是减少一次从内核缓 冲区到用户程序缓存的数据复制。比如说数据库管理系统这类应用,它们更倾向于选择它们自己的缓存 机制,因为数据库管理系统往往比操作系统更了解数据库中存放的数据,数据库管理系统可以提供一种 更加有效的缓存机制来提高数据库中数据的存取性能。

直接IO的缺点:如果访问的数据不在应用程序缓存中,那么每次数据都会直接从磁盘加载,这种直 接加载会非常缓慢。通常直接IO与异步IO结合使用,会得到比较好的性能。

下图分析了写场景下的DirectIO和BufferIO:

内存映射文件(Mmap)

在LINUX中我们可以使用mmap用来在进程虚拟内存地址空间中分配地址空间,创建和物理内存的 映射关系。

映射关系可以分为两种

  1. 文件映射 磁盘文件映射进程的虚拟地址空间,使用文件内容初始化物理内存。
  2. 匿名映射 初始化全为0的内存空间。

而对于映射关系是否共享又分为

  1. 私有映射(MAP_PRIVATE) 多进程间数据共享,修改不反应到磁盘实际文件,是一个copy-onwrite(写时复制)的映射方式。
  2. 共享映射(MAP_SHARED) 多进程间数据共享,修改反应到磁盘实际文件中。

因此总结起来有4种组合

  1. 私有文件映射 多个进程使用同样的物理内存页进行初始化,但是各个进程对内存文件的修改 不会共享,也不会反应到物理文件中
  2. 私有匿名映射 mmap会创建一个新的映射,各个进程不共享,这种使用主要用于分配内存 (malloc分配大内存会调用mmap)。 例如开辟新进程时,会为每个进程分配虚拟的地址空间, 这些虚拟地址映射的物理内存空间各个进程间读的时候共享,写的时候会copy-on-write。
  3. 共享文件映射 多个进程通过虚拟内存技术共享同样的物理内存空间,对内存文件 的修改会反 应到实际物理文件中,他也是进程间通信(IPC)的一种机制。
  4. 共享匿名映射 这种机制在进行fork的时候不会采用写时复制,父子进程完全共享同样的物理 内存页,这也就实现了父子进程通信(IPC).

mmap只是在虚拟内存分配了地址空间,只有在第一次访问虚拟内存的时候才分配物理内存。

在mmap之后,并没有在将文件内容加载到物理页上,只上在虚拟内存中分配了地址空间。当进程 在访问这段地址时,通过查找页表,发现虚拟内存对应的页没有在物理内存中缓存,则产生"缺页",由 内核的缺页异常处理程序处理,将文件对应内容,以页为单位(4096)加载到物理内存,注意是只加载缺 页,但也会受操作系统一些调度策略影响,加载的比所需的多。

直接内存读取并发送文件的过程

Mmap读取并发送文件的过程

Sendfile零拷贝读取并发送文件的过程

零拷贝(zero copy)小结

  1. 虽然叫零拷贝,实际上sendfile有2次数据拷贝的。第1次是从磁盘拷贝到内核缓冲区,第二次 是从内核缓冲区拷贝到网卡(协议引擎)。如果网卡支持 SG-DMA(The Scatter-Gather Direct Memory Access)技术,就无需从PageCache拷贝至 Socket 缓冲区;
  2. 之所以叫零拷贝,是从内存角度来看的,数据在内存中没有发生过拷贝,只是在内存和I/O设 备之间传输。很多时候我们认为sendfile才是零拷贝,mmap严格来说不算;
  3. Linux中的API为sendfile、mmap,Java中的API为FileChanel.transferTo()、 FileChannel.map()等;
  4. Netty、Kafka(sendfile)、Rocketmq(mmap)、Nginx等高性能中间件中,都有大量利用操 作系统零拷贝特性。