消息存储

RocketMQ 的存储设计是非常有创造性的。存储文件主要有三个:CommitLog、ConsumeQueue、Index。如下图:

CommitLog

RocketMQ 的消息保存在 CommitLog 中,CommitLog 每个文件 1G 大小。有趣的是,文件名并不叫 CommitLog,而是用消息的偏移量来命名。比如第一个文件文件名是 0000000000000000000,第二个文件文件名是 00000000001073741824,依次类推就可以得到所有文件的文件名。

有了上面的命名规则,给定一个消息的偏移量,就可以根据二分查找快速找到消息所在的文件,并且用消息偏移量减去文件名就可以得到消息在文件中的偏移量。

RocketMQ 写 CommitLog 时采用顺序写,大大提高了写入性能。

ConsumeQueue

这是一个目录,包含该Borker上所有的Topic对应的消费队列文件信息.消费队列的文件格式为./consumequeue/Topc名字/queue id/具体消费队列文件.每个消费队列其实是commitlog的一个索引,提供给消费者做拉取消息,更新位点使用.

如果直接从 CommitLog 中检索 Topic 中的一条消息,效率会很低,因为需要从文件的第一条消息开始依次查找。引入了 ConsumeQueue 作为 CommitLog 的索引文件,会让检索效率大增。

刚开始不理解 ConsumeQueue 和 MessageQueue 的区别,网上查了一些资料发现,每个ConsumeQueue对应一个上面介绍的 MessageQueue,MessageQueue 只是一个概念模型。

ConsumeQueue 中的元素内容如下:

  • 前 8 个字节记录消息在 CommitLog 中的偏移量。
  • 中间 4 个字节记录消息消息大小。
  • 最后 8 个字节记录消息中 tag 的 hashcode。

这个 tag 的作用非常重要,假如一个 Consumer 订阅了 TopicA,Tag1 和 Tag2,那这个 Consumer 的订阅关系如下图:

可以看到,这个订阅关系是一个 hash 类型的结构,key 是 Topic 名称,value 是一个 SubscriptionData 类型的对象,这个对象封装了 tag。

拉取消息时,首先从nameserver获取订阅关系,得到当前consumer所有订阅的tag的hashcode,集合codeset,然后从consumerqueue获取一条记录,判断最后8个字节的tag hashcode是否在codeset中以决定是否将该消息发送给consumer

index文件

RocketMQ 支持按照消息的属性查找消息,为了支持这个功能,RocketMQ 引入了 Index 索引文件。Index 文件有三部分组成,文件头 IndexHead、500万个 hash 槽和 2000 万个 Index 条目组成。

IndexHead

总共有 6 个元素组成,前两个元素表示当前这个 Index 文件中第一条消息和最后一条消息的落盘时间,第三、第四两个元素表示当前这个 Index 文件中第一条消息和最后一条消息在 CommitLog 文件中的物理偏移量,第五个元素表示当前这个 Index 文件中 hash 槽的数量,第六个元素表示当前这个 Index 文件中索引条目的个数。

查找的时候除了传入 key 还需要传入第一条消息和最后一条消息的落盘时间,这是因为 Index 文件名是时间戳命名的,传入落盘时间可以更加精确地定位 Index 文件。

Hash 槽

熟悉 Java 中 HashMap 的同学应该都比较熟悉 Hash 槽这个概念了,其实就是 Hash 结构的底层数组。Index 文件中的 Hash 槽有 500 万个数组元素,每个元素是 4 个字节 int 类型元素,保存当前槽下最新的那个 index 条目的序号。

这里 Hash 槽解决 Hash 冲突的方式是链表法,如下图:

Index 条目

每个 Index 条目中,key 的 hashcode 占 4 个字节,phyoffset 表示消息在 CommitLog 中的物理偏移量占 8 个字节,timediff 表示消息的落盘时间与 header 里的 beginTimestamp 的差值占 4 个字节,pre index no 占 4 个字节。

pre index no 保存的是当前的 Hash 槽中前一个 index 条目的序号,一般在 key 发生 Hash 冲突时才会有值,否则这个值就是 0,表示当前元素是 Hash 槽中第一个元素。
Index 条目中保存 timediff,是为了防止 key 重复。查找 key 时,在 key 相同的情况下, 如果传入的时间范围跟 timediff 不满足,则会查找 pre index no 这个条目。

通过上面的分析,我们可以总结一个通过 key 在 Index 文件中查找消息的流程,如下:

1.计算 key 的 hashcode;

2.根据 hashcode 在 Hash 槽中查找位置 s;

3.计算 Hash 槽在 Index 文件中位置 40+(s-1)*4;

4.读取这个槽的值,也就是Index条目序号 n;

5.计算该 index 条目在 Index 文件中的位置,公式:40 + 500万 4 + (n-1) 20;

6.读取这个条目,比较 key 的 hashcode 和 index 条目中 hashcode是否相同,以及 key 传入的时间范围跟 Index 条目中的 timediff 是否匹配。如果条件不符合,则查找 pre index no 这个条目,找到后,从 CommitLog 中取出消息。

config

这是一个目录保存了当前broker中全部的topic,订阅关系和消费进度,这些数据broker会定时从内存持久化到磁盘以便宕机后恢复

abort

borker是否异常关闭的标志,正常关闭时文件会被删除,异常关闭时不会.当broker重新启动时,根据是否异常宕机确定是否需要重新构建index索引等操作

checkpoint

brocker最近一次正常运行时的状态,比如最后一次正常刷盘时间,最后一次正确索引时间等

索引机制

messageid提取消息

因为meesageid就是利用borker(哪个服务器)+offset(文件的偏移量)生成的(这里msgid是指服务端的)所以很容易就找到对应的commitLog文件来读取消息

基于ConsumeQueue实现tag查询

consumeQueue实现tag查询

处理流程

  1. 消费者要获取tag20220101的消息,首先通过执行202201.hashcode取到hash值
  2. 在ConsumeQueue文件中查找hash(tag=99的offset数据
  3. 根据物理位置Offset道题对应的CommitLog文件中提取消息,因为可能会出现hash碰撞,再对某种数据以字符串的方式筛选20220101的消息
  4. 将20220101消息提取,封装为message对象返回

key的查询

每个index file文件包含文件头和,hash槽,索引数据,每个文件的hash槽个数,索引数据个数都是固定的.hash槽位可以通过broker启动参数maxHashSlitNum进行配置,默认值为500w,索引数据可以通过broker启动参数maxIndexNum进行配置,默认为500w*4=2000w,一个indexFile约为400mb

消息构成

commitLog目录下有多个CommitLog文件.其实CommitLog中只有一个文件,为了方便保存和读写被切分为多个子文件,所有子文件通过保存的第一个和最后一个消息的物理点位进行连接.每个commitlog 1个G

我们要提取某一个偏移量上的消息就可以直接提取.写入的总是最后一个commit

Last modification:November 12, 2023
如果觉得我的文章对你有用,请随意赞赏