消息存储

 

  • 分布式列队有高可用的需求,所以需要持久化,避免消息丢失

  • 生产者发送消息

  • mq收到消息,持久化之后,在存储中新增一条记录

  • 返回ack给生产者

  • mq push消息给对应的消费者,然后等消费者返回ack

  • 如果消息者在指定时间内返回ack,mq认为消费成功,进行第六步,如果mq指定时间内没收到,
    则认为消息推送失败,会重试上一步的操作

  • mq删除消息

存储介质

  • 关系型数据库DB

  • ActiveMQ就是选用jdbc来做消息持久化,但是关系型数据库在单表数据量达到千万级别,
    io性能会出现瓶颈,而且可靠性依赖于DB,DB出现故障,消息就无法保存

  • 文件系统

  • 常见的Kafka/RocketMQ/RabbitMQ就是用文件系统做持久化,一般来说分为异步刷盘和同步刷盘,
    除非机器本身或者磁盘挂掉,否则是不会出现无法持久化的故障的

  • 结论

  • 文件系统 > 关系型数据库DB

消息的存储和发送

  • 消息存储

  • 现在都是使用高性能磁盘,顺序读写速度可达到600M/S,而随机写只能100KB/S,
    所以RocketMQ消息是使用顺序写,保证消息存储的速度

  • 消息发送

  • Rocket采用了零拷贝的技术,提高了消息存盘和网络发送的速度(限制是一次最多只能映射1.5G-2G的文件,所以RocketMQ默认单个CommitLog日志数据文件为1G)

  • 零拷贝

    • 原始操作:数据从磁盘到内核态内存,再复制到用户态内存,再复制到网络驱动的内核态内存,最后复制到网卡中进行传输
    • 零拷贝:数据从磁盘复制到内核态内存,直接复制到网卡进行发送,避免不必要的cpu拷贝和上下文切换以及内存空间的浪费

消息存储结构

 

  • 消息存储基本依靠两个文件配合完成的(简直和kafka一样)

  • CommitLog:消息的物理存储文件

  • ConsumeQueue:消息的索引文件,存的是指向物理存储的地址

  • Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件

  • 还有一个额外的文件IndexFile:为了消息查询提供一种通过key或时间区间来查询消息的方法,
    通过IndexFile来查找消息的方法不影响发送和消费消息的主流程

刷盘机制

 

  • 消息通过Producer写入RocketMQ的时候,有两种写磁盘方式

  • 同步刷盘:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回写入成功的状态

  • 异步刷盘:消息写入内存的PAGECACHE后,立刻返回了成功的状态,等到内存里积累到一定程度时,统一触发写磁盘动作(和mysql的写入是一样的)

  • 可以在配置文件的flushDiskType配置,ASYN_FLUSH(异步刷盘),SYNC_FLUSH(同步刷盘)

  • 优缺点

  • 同步刷盘保证消息不丢失,但是效率慢

  • 异步刷盘吞吐量高,但是可能导致消息丢失