本文介绍了RocketMQ的文件存储原理,并且用MappedByteBuffer实现了一个简单的RocketMQ文件持久化和读取。本文适合希望进一部了解RocketMQ底层文件存储原理的开发者,学习本文需要对消息队列有一定的使用经验,对JavaNIO文件读写有一定的了解。 主要内容: 1。RocketMQ文件简介 2。RocketMQ文件结构说明 3。MappedByteBuffer简介 4。最精简的RocketMQ文件存储实现(干货)1。RocketMQ文件简介 RocketMQ具有其强大的存储能力和强大的消息索引能力,从众多消息中间件产品中脱颖而出,其原理很值得学习。 RocketMQ存储用的是本地文件存储系统,效率高也可靠。存储文件主要分为CommitLog,ConsumeQueue,Index三类文件。 CommitLog 消息存储文件,所有消息主题的消息都存储在CommitLog文件中。CommitLog单个文件大小默认1G,文件文件名是起始偏移量,总共20位,左边补零,起始偏移量是0。 比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824 ConsumeQueue 消息消费的逻辑队列,作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。 IndexFile 消息索引文件,主要存储消息Key与Offset的对应关系。 消息消费队列是RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息队列检索消息的速度 config文件夹 config文件夹中存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。 topics。json:topic配置属性 subscriptionGroup。json:消息消费组配置信息。 delayOffset。json:延时消息队列拉取进度。 consumerOffset。json:集群消费模式消息消进度。 consumerFilter。json:主题消息过滤信息。 几种文件的存储目录: RocketMQ文件目录 2。RocketMQ文件结构说明 RocketMQ文件逻辑图 ConsumeQueue ConsumeQueue文件保存在store目录下的consumequeue目录中。 ConsumeQueue每条数据占20字节空间,包含三部分内容:消息的offset、消息大小size、tag的hashCode。单个ConsumeQueue文件最多保存30W条数据。8byte (commitlogoffset)4byte (msgLength)8byte (tagCode) 一个topic会分成多个逻辑队列,每个逻辑队列对应一个ConsumeQueue文件,根据topic和queueId来组织文件,如果TopicA有两个队列0,1,那么TopicA和QueueId0组成一个ConsumeQueue,TopicA和QueueId1组成另一个ConsumeQueue。见RocketMQ文件逻辑图: CommitLog 消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。 文件的默认位置如下,仍然可通过配置文件修改: {user。home}store{commitlog}{fileName} CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。 message1 totalSize queueId queueOffset PhysicalOffset body topic 其它 message2 totalSize queueId queueOffset PhysicalOffset body topic 其它 message3 totalSize queueId queueOffset PhysicalOffset body topic 其它 字段说明: 单commitLog优点:对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。 缺点:写虽然完全是顺序写,但是读却变成了完全的随机读。读一条消息,会先读ConsumeQueue,再读CommitLog,增加了开销。要保证CommitLog与ConsumeQueue完全的一致,增加了编程的复杂度 Config的offsetTable。offset 和ConsumeQueue索引文件对应,这个offset是ConsumeQueue文件的(已经消费的)下标行数,可以直接定位到ConsumeQueue并找到commitlogOffset从而找到消息体原文, 这个offset是消息消费进度的核心 { offsetTable:{ zxptesttopiczxptestgroup2:{0:16,1:17,2:23,3:43 }, TopicTestpleaserenameuniquegroupname4:{0:250,1:250,2:250,3:250 }, RETRYzxptestgroup2zxptestgroup2:{0:3 } ordertopiczxptestgroup3:{0:0,1:3,2:3,3:3 } } } OffsetStore分为以下2种,分别存储在客户端和服务器端: 本地文件类型 BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在Consumer本地,因为每条消息会被消费组内所有的消费者消费,同消费组的消费者相互独立,消费进度要单独存储,会以文本文件的形式存储在客户端,对应的数据结构为LocalFileOffsetStore Broker代存储类型 在集群模式下,同一条消息只会被同一个消费组消费一次,消费进度会参与到负载均衡中,故消费进度是需要共享的,另外,消费者发生异常或重启为了保证可以从上一次消费的地方继续进行消费,这时的offset是统一保存到broker服务端的。对应的数据结构为RemoteBrokerOffsetStore。 IndexFile 用于为生成的索引文件提供访问服务,通过消息Key值查询消息真正的实体内容。在实际的物理存储上,文件名则是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存2000W个索引; indexFile存放的位置:{rocketmq。home}storeindexindexFile(年月日时分秒等组成文件名) 我们发送的消息体中,包含MessageKey或UniqueKey,那么就会给它们每一个都构建索引。根据消息Key计算Hash槽的位置根据Hash槽的数量和Index索引来计算Index条目的起始位置 将当前Index条目的索引值,写在Hash槽absSlotPos位置上;将Index条目的具体信息(hashcode消息偏移量时间差值hash槽的值),从起始偏移量absIndexPos开始,顺序按字节写入。 由于出现了多个偏移量的概念,所以我总结一下:CommitLog中的offset(消息体偏移量)体现在commitlog文件名称中,对应这个CommitLog文件所有消息在整个topic的队列中起始偏移量(方便通过ConsumeQueue。commitlogOffset找到当前要消费的消息存在于哪个commitlog文件)ConsumeQueue中的commitlogOffset(消息体偏移量)定位了当前这条消息在commitlog中的偏移量offsettable。offset(下标)定位了当前已经消费的ConsumeQueue的下标是哪条消息 3。MappedByteBuffer简介 以前我们操作大文件都是用BufferedInputStream、BufferedOutputStream等带缓冲的IO流处理,但是针对大文件读写性能不理想。 MappedByteBuffer是Java提供的基于操作系统虚拟内存映射(MMAP)技术的文件读写API,采用directbuffer的方式读写文件内容,底层不再通过read、write、seek等系统调用实现文件的读写,所以效率非常高。主要用于操作大文件,如上百M、上GB的大文件。RocketMQ使用MappedByteBuffer实现高性能的文件读写。 MMAP原理 一台服务器把本机磁盘文件的内容发送到客户端,一般分为两个步骤:read:读取本地文件内容;write:将读取的内容通过网络发送出去。 普通文件读写 这两个操作发生了两次系统调用,每次系统调用都得先从用户态切换到内核态,等内核完成任务后,再从内核态切换回用户态,也就是消息发送过程中一共发生了4次用户态与内核态的上下文切换。另外还发生了4次数据拷贝,其中两次是DMA的拷贝,另外两次则是通过CPU拷贝的,分别是:DMA把数据从磁盘拷贝到内核态缓冲区;CPU把数据从内核态缓冲区拷贝到用户缓冲区;CPU把数据从用户缓冲区拷贝到内核的网络驱动的socket缓冲区;DMA把数据从网络驱动的socket缓冲区拷贝到网卡的缓冲区中。 mmap文件读写 系统调用函数在调用进程的虚拟地址空间中创建一个新映射。这个映射会直接把内核缓冲区里的数据映射到用户空间,这样就不用从内核空间到用户空间来回复制数据了。 应用进程调用mmap(),DMA把数据从磁盘拷贝到内核缓冲区里; 应用进程调用write(),CPU直接将内核缓冲区的数据拷贝到socket缓冲区中; DMA把数据从内核的socket缓冲区拷贝到网卡的缓冲区里。 通过上面的分析,我们可以发现,比起原始版本,mmapwrite的方式依然需要4次用户态与内核态的上下文切换,但是少了一次内存拷贝。 代码示例:publicstaticvoidread()throwsIOException{try(RandomAccessFilefilenewRandomAccessFile(newFile(test。txt),r)){getChannelFileChannelfileChannelfile。getChannel();getmappedByteBufferfromfileChannelMappedByteBufferbufferfileChannel。map(FileChannel。MapMode。READONLY,0,fileChannel。size());checkbufferLOG。info(isLoadedinphysicalmemory:{},buffer。isLoaded());只是一个提醒而不是guaranteeLOG。info(capacity{},buffer。capacity());readthebufferfor(inti0;ibuffer。limit();i){LOG。info(get{},buffer。get());}}}publicstaticvoidwriteWithMap()throwsIOException{try(RandomAccessFilefilenewRandomAccessFile(newFile(a。txt),rw)){getChannelFileChannelfileChannelfile。getChannel();getmappedByteBufferfromfileChannelMappedByteBufferbufferfileChannel。map(FileChannel。MapMode。READWRITE,0,40968);checkbufferLOG。info(isLoadedinphysicalmemory:{},buffer。isLoaded());只是一个提醒而不是guaranteeLOG。info(capacity{},buffer。capacity());writethecontentbuffer。put(dhy。getBytes());}} FileChannel的map方法有三个参数:MapMode:映射模式,可取值有READONLY(只读映射)、READWRITE(读写映射)、PRIVATE(私有映射),READONLY只支持读,READWRITE支持读写,而PRIVATE只支持在内存中修改,不会写回磁盘;position和size:映射区域,可以是整个文件,也可以是文件的某一部分,单位为字节。4。最精简的RocketMQ文件存储实现(干货) 1。简单索引文件读写 模拟conusmQueue创建10个索引,长度固定20,保存到文件。publicclassFileWrite{publicstaticvoidmain(String〔〕args)throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123111。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,4096);fileChannel。close();for(inti0;i10;i){mappedByteBuffer。position(i20);ByteBufferbByteBuffer。allocate(20);b。putLong(100);8byte(commitlogoffset)b。putInt(1000);4byte(msgLength)b。putLong(20);8byte(tagCode)b。flip();mappedByteBuffer。put(b);}mappedByteBuffer。force();}}publicclassFileRead{publicstaticvoidmain(String〔〕args)throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123111。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,4096);fileChannel。close();for(inti0;i10;i){mappedByteBuffer。position(i20);longcommitlogOffsetmappedByteBuffer。getLong();longmsgLenmappedByteBuffer。getInt();longtagCodemappedByteBuffer。getLong();System。out。println(文件读取:commitlogOffset:commitlogOffset,msgLen:msgLen,tagCode:tagCode);}}} 运行结果: 2。基于consumeQueue和CommitLog的读写 手动创建100个消息体,存入commitLog,然后创建索引文件publicclassCommitLogWriteTest{privatestaticLongcommitLogOffset0L;8byte(commitlogoffset)privatestaticLonglastTotalSize0L;publicstaticvoidmain(String〔〕args)throwsIOException{ListConsumerQueueDatalistcreateCommitLog();createConsumerQueue(list);}privatestaticListConsumerQueueDatacreateCommitLog()throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123commitLog。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);fileChannel。close();ListConsumerQueueDatalistnewArrayList();RandomrandomnewRandom();intcount0;for(inti0;i100;i){longcommitLogOffsetlastTotalSize;StringtopicTopictest;StringmsgIdUUID。randomUUID()。toString();StringmsgBody消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg。substring(0,random。nextInt(48));longqueueOffseti;索引偏移量StringtransactionIdUUID。randomUUID()。toString();数据格式,位置固定inttotalSize;消息长度StringmsgId;Stringtopic;longqueueOffset;索引偏移量longbodySize;消息长度byte〔〕body;消息内容StringtransactionId;longcommitLogOffset;从第一个文件开始算的偏移量inttotalSize8totalSize长度64msgId长度64topic长度8索引偏移量长度8消息长度长度msgBody。getBytes(StandardCharsets。UTF8)。length消息内容长度64transactionId长度64commitLogOffset长度;;ByteBufferbByteBuffer。allocate(totalSize);如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300mappedByteBuffer。position(Integer。valueOf(commitLogOffset));b。putLong(totalSize);totalSizeb。put(getBytes(msgId,64));msgIdb。put(getBytes(topic,64));topic,定长64b。putLong(queueOffset);索引偏移量b。putLong(msgBody。getBytes(StandardCharsets。UTF8)。length);bodySizeb。put(msgBody。getBytes(StandardCharsets。UTF8));bodyb。put(getBytes(transactionId,64));b。putLong(commitLogOffset);bodySizeb。flip();mappedByteBuffer。put(b);allTotalSizetotalSizeallTotalSize;System。out。println(写入消息,第:i次);System。out。println(totalSize:totalSize);System。out。println(msgId:msgId);System。out。println(topic:topic);System。out。println(msgBody:msgBody);System。out。println(transactionId:transactionId);System。out。println(commitLogOffset:commitLogOffset);ConsumerQueueDataconsumerQueueDatanewConsumerQueueData();consumerQueueData。setOffset(commitLogOffset);consumerQueueData。setMsgLength(totalSize);consumerQueueData。setTagCode(100L);list。add(consumerQueueData);count;}mappedByteBuffer。force();System。out。println(commitLog数据保存完成,totalSize:count);returnlist;}privatestaticvoidcreateConsumerQueue(ListConsumerQueueDatalist)throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123consumerQueue。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,4096);fileChannel。close();intcount0;for(inti0;ilist。size();i){ConsumerQueueDataconsumerQueueDatalist。get(i);mappedByteBuffer。position(i20);ByteBufferbByteBuffer。allocate(20);b。putLong(consumerQueueData。getOffset());8byte(commitlogoffset)b。putInt(consumerQueueData。getMsgLength());4byte(msgLength)b。putLong(consumerQueueData。getTagCode());8byte(tagCode)b。flip();很重要,使读指针从头开始mappedByteBuffer。put(b);count;System。out。println(createConsumerQueue:JSON。toJSONString(consumerQueueData));}System。out。println(ConsumerQueue数据保存完成count:count);mappedByteBuffer。force();}将变长字符串定长byte〔〕,方便读取privatestaticbyte〔〕getBytes(Strings,intlength){intfixLengthlengths。getBytes()。length;if(s。getBytes()。lengthlength){byte〔〕Sbytesnewbyte〔length〕;System。arraycopy(s。getBytes(),0,Sbytes,0,s。getBytes()。length);for(intxlengthfixLength;xlength;x){Sbytes〔x〕0x00;}returnSbytes;}returns。getBytes(StandardCharsets。UTF8);}} 运行结果:(数据有100条,没展示全部) 读取索引文件,然后根据偏移量在commitLog文件中读取消息publicclassCommitLogReadTest{staticFileChannelcommitLogfileChannelnull;publicstaticvoidmain(String〔〕args)throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123consumerQueue。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,4096);fileChannel。close();intindex0;for(intiindex;i100;i){根据索引下标读取索引,实际情况是用户消费的最新点位,存在在broker的偏移量文件中mappedByteBuffer。position(i20);longcommitlogOffsetmappedByteBuffer。getLong();System。out。println(commitlogOffset);longmsgLenmappedByteBuffer。getInt();LongtagmappedByteBuffer。getLong();System。out。println(读取到consumerQueue,commitlogOffset:commitlogOffset,msgLen:msgLen);根据偏移量读取CcommitLogreadCommitLog(Integer。valueOf(commitlogOffset));}}publicstaticMappedByteBufferinitFileChannel()throwsIOException{MappedByteBuffermappedByteBuffernull;if(mappedByteBuffernull){commitLogfileChannelFileChannel。open(Paths。get(URI。create(file:c:123commitLog。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);mappedByteBuffercommitLogfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);commitLogfileChannel。close();}returnmappedByteBuffer;}根据偏移量读取CommitLogpublicstaticvoidreadCommitLog(intoffset)throwsIOException{写入顺序,读的时候也按这个顺序读取b。putLong(totalSize);totalSizeb。put(getBytes(msgId,64));msgIdb。put(getBytes(topic,64));topic,定长64b。putLong(queueOffset);索引偏移量b。putLong(msgBody。getBytes(StandardCharsets。UTF8)。length);bodySizeb。put(msgBody。getBytes(StandardCharsets。UTF8));bodyb。put(getBytes(transactionId,64));b。putLong(commitLogOffset);commitLogOffsetSystem。out。println(commitlog读取偏移量为offset的消息);MappedByteBuffermappedByteBufferinitFileChannel();很重要,按偏移量读取文件,入参为索引文件记录的偏移量mappedByteBuffer。position(offset);longtotalSizemappedByteBuffer。getLong();消息长度byte〔〕msgIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(msgIdByte);byte〔〕topicBytenewbyte〔64〕;固定是64mappedByteBuffer。get(topicByte);longqueueOffsetmappedByteBuffer。getLong();LongbodySizemappedByteBuffer。getLong();byte〔〕bodyBytenewbyte〔Integer。parseInt(bodySize)〕;bodySize长度不固定mappedByteBuffer。get(bodyByte);byte〔〕transactionIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(transactionIdByte);longcommitLogOffsetmappedByteBuffer。getLong();偏移量System。out。println(totalSize:totalSize);System。out。println(msgId:newString(msgIdByte));System。out。println(topic:newString(topicByte));System。out。println(queueOffset:queueOffset);System。out。println(bodySize:bodySize);System。out。println(body:newString(bodyByte));System。out。println(transactionId:newString(transactionIdByte));System。out。println(commitLogOffset:commitLogOffset);}} 运行结果:(数据有100条,没展示全部) 总结: 本文介绍了RocketMQ的文件存储基本原理,并基于JavaNIO的MappedByteBuffer实现了对RocketMQ的存储文件CommotLog,索引文件ConsumeQueue的写入,以及按索引下标读取CommotLog的,希望能加深大家对RocketMQ文件存储的理解。