消息存储
-
分布式列队有高可用的需求,所以需要持久化,避免消息丢失
-
生产者发送消息
-
mq收到消息,持久化之后,在存储中新增一条记录
-
返回ack给生产者
-
mq push消息给对应的消费者,然后等消费者返回ack
-
如果消息者在指定时间内返回ack,mq认为消费成功,进行第六步,如果mq指定时间内没收到,
则认为消息推送失败,会重试上一步的操作 -
mq删除消息
存储介质
-
关系型数据库DB
-
ActiveMQ就是选用jdbc来做消息持久化,但是关系型数据库在单表数据量达到千万级别,
io性能会出现瓶颈,而且可靠性依赖于DB,DB出现故障,消息就无法保存 -
文件系统
-
常见的Kafka/RocketMQ/RabbitMQ就是用文件系统做持久化,一般来说分为异步刷盘和同步刷盘,
除非机器本身或者磁盘挂掉,否则是不会出现无法持久化的故障的 -
结论
-
文件系统 > 关系型数据库DB
消息的存储和发送
-
消息存储
-
现在都是使用高性能磁盘,顺序读写速度可达到600M/S,而随机写只能100KB/S,
所以RocketMQ消息是使用顺序写,保证消息存储的速度 -
消息发送
-
Rocket采用了零拷贝的技术,提高了消息存盘和网络发送的速度(限制是一次最多只能映射1.5G-2G的文件,所以RocketMQ默认单个CommitLog日志数据文件为1G)
-
零拷贝
- 原始操作:数据从磁盘到内核态内存,再复制到用户态内存,再复制到网络驱动的内核态内存,最后复制到网卡中进行传输
- 零拷贝:数据从磁盘复制到内核态内存,直接复制到网卡进行发送,避免不必要的cpu拷贝和上下文切换以及内存空间的浪费
消息存储结构
-
消息存储基本依靠两个文件配合完成的(简直和kafka一样)
-
CommitLog:消息的物理存储文件
-
ConsumeQueue:消息的索引文件,存的是指向物理存储的地址
-
Topic下的每个MessageQueue都有一个对应的ConsumeQueue文件
-
还有一个额外的文件IndexFile:为了消息查询提供一种通过key或时间区间来查询消息的方法,
通过IndexFile来查找消息的方法不影响发送和消费消息的主流程
刷盘机制
-
消息通过Producer写入RocketMQ的时候,有两种写磁盘方式
-
同步刷盘:消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回写入成功的状态
-
异步刷盘:消息写入内存的PAGECACHE后,立刻返回了成功的状态,等到内存里积累到一定程度时,统一触发写磁盘动作(和mysql的写入是一样的)
-
可以在配置文件的flushDiskType配置,ASYN_FLUSH(异步刷盘),SYNC_FLUSH(同步刷盘)
-
优缺点
-
同步刷盘保证消息不丢失,但是效率慢
-
异步刷盘吞吐量高,但是可能导致消息丢失