前言 RocketMQ作为一款优秀的开源消息中间件,很多java开发者都在使用并研究里面的源码。目前网上有很多关于RocketMQ源代码的文章,但是很多文章只是从框架开发者的的角度分析源码,没有从技术实现本质进行剖析。因此很多源码学习者在读完后还是一知半解,当自己想动手写的时候不知要用到哪种技术,无从着手。笔者基于对RocketMQ的文件存储研究,结合开发者常见的技术,自己动手实现了一个简化版本的RocketMQ文件系统,希望能抽丝剥茧,帮助开发者从本质上理解RocketMQ文件存储的原理,起到抛砖引玉,举一反三的作用。 RocketMQ逻辑存储结构 本文适合对RocketMQ的文件存储原理有一定的了解,熟悉javaNIO,希望了解RocketMQ是如何通过javaNIO实现的读者。以下代码部分: 1。手动生成10个消息,并创建commitLog文件,consumeQueue,indexFile文件packageorg。apache。rocketmq。test。smoke;importcom。alibaba。fastjson。JSON;importorg。apache。rocketmq。test。smoke。model。ConsumerQueueData;importorg。apache。rocketmq。test。smoke。model。IndexFileHeaderData;importorg。apache。rocketmq。test。smoke。model。IndexFileItemData;importjava。io。IOException;importjava。net。URI;importjava。nio。ByteBuffer;importjava。nio。MappedByteBuffer;importjava。nio。channels。FileChannel;importjava。nio。charset。StandardCharsets;importjava。nio。file。Paths;importjava。nio。file。StandardOpenOption;importjava。util。ArrayList;importjava。util。List;importjava。util。Random;importjava。util。UUID;publicclassCommitLogWriteTest{privatestaticLongcommitLogOffset0L;8byte(commitlogoffset)privatestaticLonglastTotalSize0L;privatestaticLongcurrentTotalSize0L;privatestaticListConsumerQueueDataconsumerQueueDatasnewArrayList();privatestaticListIndexFileItemDataindexFileItemDatasnewArrayList();privatestaticintMESSAGECOUNT10;publicstaticvoidmain(String〔〕args)throwsIOException{createCommitLog();createConsumerQueue();createIndexFile();}privatestaticvoidcreateCommitLog()throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123commitLog。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);fileChannel。close();RandomrandomnewRandom();intcount0;for(inti0;iMESSAGECOUNT;i){longcommitLogOffsetlastTotalSize;StringtopicTopictest;StringmsgIdUUID。randomUUID()。toString();StringmsgBody消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg。substring(0,random。nextInt(48));longqueueOffseti;索引偏移量StringtransactionIdUUID。randomUUID()。toString();数据格式,位置固定inttotalSize;消息长度StringmsgId;Stringtopic;longqueueOffset;索引偏移量longbodySize;消息长度byte〔〕body;消息内容StringtransactionId;longcommitLogOffset;从第一个文件开始算的偏移量inttotalSize8totalSize长度64msgId长度64topic长度8索引偏移量长度8消息长度长度msgBody。getBytes(StandardCharsets。UTF8)。length消息内容长度64transactionId长度64commitLogOffset长度;;ByteBufferbByteBuffer。allocate(totalSize);如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300mappedByteBuffer。position(Integer。valueOf(commitLogOffset));b。putLong(totalSize);totalSizeb。put(getBytes(msgId,64));msgIdb。put(getBytes(topic,64));topic,定长64b。putLong(queueOffset);索引偏移量b。putLong(msgBody。getBytes(StandardCharsets。UTF8)。length);bodySizeb。put(msgBody。getBytes(StandardCharsets。UTF8));bodyb。put(getBytes(transactionId,64));b。putLong(commitLogOffset);bodySizeb。flip();mappedByteBuffer。put(b);lastTotalSizetotalSizelastTotalSize;System。out。println(写入消息,第:i次);System。out。println(totalSize:totalSize);System。out。println(msgId:msgId);System。out。println(topic:topic);System。out。println(msgBody:msgBody);System。out。println(transactionId:transactionId);System。out。println(commitLogOffset:commitLogOffset);ConsumerQueueDataconsumerQueueDatanewConsumerQueueData();consumerQueueData。setOffset(commitLogOffset);consumerQueueData。setMsgLength(totalSize);consumerQueueData。setTagCode(100L);准备生成consumeQueue文件consumerQueueDatas。add(consumerQueueData);IndexFileItemDataindexFileItemDatanewIndexFileItemData();indexFileItemData。setKeyHash(msgId。hashCode());indexFileItemData。setMessageId(msgId);indexFileItemData。setPhyOffset(commitLogOffset);准备生成indexFile文件indexFileItemDatas。add(indexFileItemData);mappedByteBuffer。force();count;}System。out。println(commitLog数据保存完成,totalSize:count);}publicstaticvoidcreateConsumerQueue()throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123consumerQueue。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,4096);fileChannel。close();intcount0;for(inti0;iconsumerQueueDatas。size();i){ConsumerQueueDataconsumerQueueDataconsumerQueueDatas。get(i);指定写入位置mappedByteBuffer。position(i20);mappedByteBuffer。putLong(consumerQueueData。getOffset());8byte(commitlogoffset)mappedByteBuffer。putInt(consumerQueueData。getMsgLength());4byte(msgLength)mappedByteBuffer。putLong(consumerQueueData。getTagCode());8byte(tagCode)count;System。out。println(consumerQueue数据写入完成:JSON。toJSONString(consumerQueueData));mappedByteBuffer。force();}System。out。println(ConsumerQueue数据保存完成count:count);}publicstaticvoidcreateIndexFile()throwsIOException{文件场创建时间,在写第一条消息的时候创建FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123index。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);ByteBufferheaderByteBuffermappedByteBuffer。slice();longfirstDataTimeSystem。currentTimeMillis();fileChannel。close();开始写hash槽,从头部后写入已经填充有index的slot数量(并不是每个slot槽下都挂载有index索引单元,这里统计的是所有挂载了index索引单元的slot槽的数量,hash冲突)inthashSlotCount0;已该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所有index索引单元的数量之和),如果没有hash冲突,hashSlotCountindexCountintindexCount0;假设建立100个槽位(总长度400)intsoltNum100;for(inti0;iMESSAGECOUNT;i){IndexFileItemDataindexFileItemDataindexFileItemDatas。get(i);intkeyHashindexFileItemData。getKeyHash();取模,计算第几个槽位intslotPoskeyHash1000?keyHash100:1(keyHash100);slot存放第几条数据的的位置(字节数组位置)intabsSlotPos40slotPos4;存储实际数据的位置(字节数组位置)intabsIndexPos40soltNum4indexCount20;将hash槽的值设置为indexCount,建立索引,即第n条消息保存在XX位置mappedByteBuffer。putInt(absSlotPos,indexCount);写入数据mappedByteBuffer。putInt(absIndexPos,indexFileItemData。getKeyHash());4bytemsghashcodemappedByteBuffer。putLong(absIndexPos4,indexFileItemData。getPhyOffset());8bytepyhoffsetmappedByteBuffer。putInt(absIndexPos48,Integer。valueOf((System。currentTimeMillis()firstDataTime)));4byte(timeDiff)mappedByteBuffer。putInt(absIndexPos484,0);4byte(preIndex),暂置0,暂不考虑hash冲突的情况模拟最后一个文件,写入headerif(i0){该indexFile中第一条消息的存储时间headerByteBuffer。putLong(0,firstDataTime);该indexFile种第一条消息在commitlog种的偏移量commitlogoffsetmappedByteBuffer。putLong(16,indexFileItemData。getPhyOffset());}模拟最后一个文件,写入headerif(i99){该indexFile种最后一条消息存储时间headerByteBuffer。putLong(8,System。currentTimeMillis());该indexFile中最后一条消息在commitlog中的偏移量commitlogoffsetheaderByteBuffer。putLong(24,indexFileItemData。getPhyOffset());}已经填充有index的slot数量headerByteBuffer。putInt(32,hashSlotCount1);该indexFile中包含的索引单元个数headerByteBuffer。putInt(36,indexCount1);mappedByteBuffer。force();System。out。println(msgId:indexFileItemData。getMessageId(),keyHash:keyHash,保存槽位为slotPos的数据,absSlotPosabsSlotPos,值indexindexCount,绝对位置:absIndexPos,commitphyOffset:indexFileItemData。getPhyOffset());indexCount;hashSlotCount;}}将变长字符串定长byte〔〕,方便读取privatestaticbyte〔〕getBytes(Strings,intlength){intfixLengthlengths。getBytes()。length;if(s。getBytes()。lengthlength){byte〔〕Sbytesnewbyte〔length〕;System。arraycopy(s。getBytes(),0,Sbytes,0,s。getBytes()。length);for(intxlengthfixLength;xlength;x){Sbytes〔x〕0x00;}returnSbytes;}returns。getBytes(StandardCharsets。UTF8);}} 运行结果:写入消息,第:0次totalSize:322msgId:61b5c500f7f54bfebeb150a8148534c0topic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgtransactionId:9453ba39398240e9926d47b51d360590commitLogOffset:0写入消息,第:1次totalSize:306msgId:d0fbf80f223b4721a43e518b152decc2topic:TopictestmsgBody:消息内容msgmsgmsgmsgmstransactionId:e2ef165258fa4849bf74885c7e5db9e3commitLogOffset:322写入消息,第:2次totalSize:307msgId:199053e3e6164611ab0de5c7af4549a9topic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgtransactionId:33d21abe0d8e4c0e9c78f415daefd767commitLogOffset:628写入消息,第:3次totalSize:339msgId:8e799d8e32904f6bab5d289153446994topic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:98b46b96cc884969a56f282d25799085commitLogOffset:935写入消息,第:4次totalSize:320msgId:8b78474fb28a444299a06f7883f0302btopic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:5c0ff6feaea340c386476f4bdd797a78commitLogOffset:1274写入消息,第:5次totalSize:312msgId:b33c6f31cc96462bb09599410459082ctopic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmstransactionId:57420047253943faa3f2b2f55c7b059ccommitLogOffset:1594写入消息,第:6次totalSize:324msgId:d0a6803f8555418e988ab3b9a70d14f0topic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:296013353fcd4193b14f140bbaf409a4commitLogOffset:1906写入消息,第:7次totalSize:293msgId:91151ec5e76b456090b7ab77f9d04c9atopic:TopictestmsgBody:消息内容mtransactionId:291e54de2ebe41b1b974e81a2e9f1370commitLogOffset:2230写入消息,第:8次totalSize:323msgId:eb21df35b4dc43aa8604e9a103f25a7btopic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:3005a39db8cb4138ae0534b65fc135a2commitLogOffset:2523写入消息,第:9次totalSize:296msgId:abcda3648fc34d18ae5d1d7a8ffd0929topic:TopictestmsgBody:消息内容msgmtransactionId:d42733b539114f0ab1db11eb45a30345commitLogOffset:2846commitLog数据保存完成,totalSize:10创建consumerQueue文件开始consumerQueue数据写入完成:{msgLength:322,offset:0,tagCode:100}consumerQueue数据写入完成:{msgLength:306,offset:322,tagCode:100}consumerQueue数据写入完成:{msgLength:307,offset:628,tagCode:100}consumerQueue数据写入完成:{msgLength:339,offset:935,tagCode:100}consumerQueue数据写入完成:{msgLength:320,offset:1274,tagCode:100}consumerQueue数据写入完成:{msgLength:312,offset:1594,tagCode:100}consumerQueue数据写入完成:{msgLength:324,offset:1906,tagCode:100}consumerQueue数据写入完成:{msgLength:293,offset:2230,tagCode:100}consumerQueue数据写入完成:{msgLength:323,offset:2523,tagCode:100}consumerQueue数据写入完成:{msgLength:296,offset:2846,tagCode:100}ConsumerQueue数据保存完成count:10创建索引文件开始msgId:61b5c500f7f54bfebeb150a8148534c0,keyHash:249765627,保存槽位为27的数据,absSlotPos148,值index0,绝对位置:440,commitphyOffset:0msgId:d0fbf80f223b4721a43e518b152decc2,keyHash:1587335015,保存槽位为15的数据,absSlotPos100,值index1,绝对位置:460,commitphyOffset:322msgId:199053e3e6164611ab0de5c7af4549a9,keyHash:791210473,保存槽位为73的数据,absSlotPos332,值index2,绝对位置:480,commitphyOffset:628msgId:8e799d8e32904f6bab5d289153446994,keyHash:1460275929,保存槽位为29的数据,absSlotPos156,值index3,绝对位置:500,commitphyOffset:935msgId:8b78474fb28a444299a06f7883f0302b,keyHash:1174005465,保存槽位为65的数据,absSlotPos300,值index4,绝对位置:520,commitphyOffset:1274msgId:b33c6f31cc96462bb09599410459082c,keyHash:1695757800,保存槽位为0的数据,absSlotPos40,值index5,绝对位置:540,commitphyOffset:1594msgId:d0a6803f8555418e988ab3b9a70d14f0,keyHash:1334295408,保存槽位为8的数据,absSlotPos72,值index6,绝对位置:560,commitphyOffset:1906msgId:91151ec5e76b456090b7ab77f9d04c9a,keyHash:1287318090,保存槽位为90的数据,absSlotPos400,值index7,绝对位置:580,commitphyOffset:2230msgId:eb21df35b4dc43aa8604e9a103f25a7b,keyHash:239865974,保存槽位为74的数据,absSlotPos336,值index8,绝对位置:600,commitphyOffset:2523msgId:abcda3648fc34d18ae5d1d7a8ffd0929,keyHash:1173357775,保存槽位为75的数据,absSlotPos340,值index9,绝对位置:620,commitphyOffset:2846 2。读取consumeQueue文件,并根据offset从commitLog读取一条完整的消息packageorg。apache。rocketmq。test。smoke;importjava。io。IOException;importjava。net。URI;importjava。nio。ByteBuffer;importjava。nio。CharBuffer;importjava。nio。MappedByteBuffer;importjava。nio。channels。FileChannel;importjava。nio。charset。Charset;importjava。nio。charset。StandardCharsets;importjava。nio。file。Paths;importjava。nio。file。StandardOpenOption;publicclassConsumeQueueMessageReadTest{publicstaticMappedByteBuffermappedByteBuffernull;privatestaticintMESSAGECOUNT10;publicstaticvoidmain(String〔〕args)throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123consumerQueue。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,4096);fileChannel。close();mappedByteBuffer。position(0);根据索引下标读取索引,实际情况是用户消费的最新点位,存在在broker的偏移量文件中intindex0;for(intiindex;iMESSAGECOUNT;i){mappedByteBuffer。position(i20);longcommitlogOffsetmappedByteBuffer。getLong();System。out。println(commitlogOffset);longmsgLenmappedByteBuffer。getInt();LongtagmappedByteBuffer。getLong();System。out。println(读取到consumerQueue,commitlogOffset:commitlogOffset,msgLen:msgLen);根据偏移量读取CcommitLogreadCommitLogByOffset(Integer。valueOf(commitlogOffset));}}publicstaticMappedByteBufferinitFileChannel()throwsIOException{if(mappedByteBuffernull){FileChannelcommitLogfileChannelFileChannel。open(Paths。get(URI。create(file:c:123commitLog。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);mappedByteBuffercommitLogfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);commitLogfileChannel。close();}returnmappedByteBuffer;}根据偏移量读取commitLogpublicstaticvoidreadCommitLogByOffset(intoffset)throwsIOException{存放顺序,读时候保持顺序一致b。putLong(totalSize);totalSizeb。put(getBytes(msgId,64));msgIdb。put(getBytes(topic,64));topic,定长64b。putLong(queueOffset);索引偏移量b。putLong(msgBody。getBytes(StandardCharsets。UTF8)。length);bodySizeb。put(msgBody。getBytes(StandardCharsets。UTF8));bodyb。put(getBytes(transactionId,64));b。putLong(commitLogOffset);commitLogOffsetSystem。out。println(commitlog读取偏移量为offset的消息);MappedByteBuffermappedByteBufferinitFileChannel();mappedByteBuffer。position(offset);longtotalSizemappedByteBuffer。getLong();消息长度byte〔〕msgIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(msgIdByte);byte〔〕topicBytenewbyte〔64〕;固定是64mappedByteBuffer。get(topicByte);longqueueOffsetmappedByteBuffer。getLong();LongbodySizemappedByteBuffer。getLong();intbSizeInteger。valueOf(bodySize);byte〔〕bodyBytenewbyte〔bSize〕;bodySize长度不固定mappedByteBuffer。get(bodyByte);byte〔〕transactionIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(transactionIdByte);longcommitLogOffsetmappedByteBuffer。getLong();偏移量System。out。println(totalSize:totalSize);System。out。println(msgId:newString(msgIdByte));System。out。println(topic:newString(topicByte));System。out。println(queueOffset:queueOffset);System。out。println(bodySize:bodySize);System。out。println(body:newString(bodyByte));System。out。println(transactionId:newString(transactionIdByte));System。out。println(commitLogOffset:commitLogOffset);}} 运行结果:commitlog读取偏移量为0的消息totalSize:322msgId:61b5c500f7f54bfebeb150a8148534c0topic:TopictestqueueOffset:0bodySize:42body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgtransactionId:9453ba39398240e9926d47b51d360590commitLogOffset:0commitlog读取偏移量为322的消息totalSize:306msgId:d0fbf80f223b4721a43e518b152decc2topic:TopictestqueueOffset:1bodySize:26body:消息内容msgmsgmsgmsgmstransactionId:e2ef165258fa4849bf74885c7e5db9e3commitLogOffset:322commitlog读取偏移量为628的消息totalSize:307msgId:199053e3e6164611ab0de5c7af4549a9topic:TopictestqueueOffset:2bodySize:27body:消息内容msgmsgmsgmsgmsgtransactionId:33d21abe0d8e4c0e9c78f415daefd767commitLogOffset:628commitlog读取偏移量为935的消息totalSize:339msgId:8e799d8e32904f6bab5d289153446994topic:TopictestqueueOffset:3bodySize:59body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:98b46b96cc884969a56f282d25799085commitLogOffset:935commitlog读取偏移量为1274的消息totalSize:320msgId:8b78474fb28a444299a06f7883f0302btopic:TopictestqueueOffset:4bodySize:40body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:5c0ff6feaea340c386476f4bdd797a78commitLogOffset:1274commitlog读取偏移量为1594的消息totalSize:312msgId:b33c6f31cc96462bb09599410459082ctopic:TopictestqueueOffset:5bodySize:32body:消息内容msgmsgmsgmsgmsgmsgmstransactionId:57420047253943faa3f2b2f55c7b059ccommitLogOffset:1594commitlog读取偏移量为1906的消息totalSize:324msgId:d0a6803f8555418e988ab3b9a70d14f0topic:TopictestqueueOffset:6bodySize:44body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:296013353fcd4193b14f140bbaf409a4commitLogOffset:1906commitlog读取偏移量为2230的消息totalSize:293msgId:91151ec5e76b456090b7ab77f9d04c9atopic:TopictestqueueOffset:7bodySize:13body:消息内容mtransactionId:291e54de2ebe41b1b974e81a2e9f1370commitLogOffset:2230commitlog读取偏移量为2523的消息totalSize:323msgId:eb21df35b4dc43aa8604e9a103f25a7btopic:TopictestqueueOffset:8bodySize:43body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:3005a39db8cb4138ae0534b65fc135a2commitLogOffset:2523commitlog读取偏移量为2846的消息totalSize:296msgId:abcda3648fc34d18ae5d1d7a8ffd0929topic:TopictestqueueOffset:9bodySize:16body:消息内容msgmtransactionId:d42733b539114f0ab1db11eb45a30345commitLogOffset:28 3。根据messageId读取indexFile,然后根据偏移量从CommitLog读取一条完整的消息packageorg。apache。rocketmq。test。smoke;importjava。io。IOException;importjava。net。URI;importjava。nio。MappedByteBuffer;importjava。nio。channels。FileChannel;importjava。nio。file。Paths;importjava。nio。file。StandardOpenOption;publicclassIndexFileMessageReadTest{publicstaticMappedByteBuffermappedByteBuffernull;publicstaticvoidmain(String〔〕args)throwsIOException{StringmsgId8b78474fb28a444299a06f7883f0302b;readByMessageId(msgId);}privatestaticvoidreadByMessageId(StringmessageId)throwsIOException{FileChannelindexFileChannelFileChannel。open(Paths。get(URI。create(file:c:123index。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBufferindexMappedByteBufferindexFileChannel。map(FileChannel。MapMode。READWRITE,0,4096);indexFileChannel。close();System。out。println(getindexFileheader);System。out。println(beginTimestampIndex:indexMappedByteBuffer。getLong());System。out。println(endTimestampIndex:indexMappedByteBuffer。getLong());System。out。println(beginPhyoffsetIndex:indexMappedByteBuffer。getLong());System。out。println(endPhyoffsetIndex:indexMappedByteBuffer。getLong());System。out。println(hashSlotcountIndex:indexMappedByteBuffer。getInt());System。out。println(indexCountIndex:indexMappedByteBuffer。getInt());System。out。println();intkeyHashmessageId。hashCode();取模,计算第几个槽位intslotPoskeyHash1000?keyHash100:1(keyHash100);System。out。println(messageId:messageId,取模为:slotPos);slot在文件中的字节数组位置intabsSlotPos40slotPos4;System。out。println(哈希槽的字节数组位置:(40slotPos4)absSlotPos);获取hash槽上存取的件索引,第几个文件intindexindexMappedByteBuffer。getInt(absSlotPos);计算数据需要存储的偏移量intabsIndexPos401004index20;System。out。println(第几个文件indexindex,实际存储数据的字节数组位置:(401004index20)absIndexPos);longkeyHash1indexMappedByteBuffer。getInt(absIndexPos);longpyhOffsetindexMappedByteBuffer。getLong(absIndexPos4);inttimeDiffindexMappedByteBuffer。getInt(absIndexPos48);intpreIndexNoindexMappedByteBuffer。getInt(absIndexPos484);System。out。println(从index获取到的commitLog偏移量为:pyhOffset);System。out。println();readCommitLogByOffset((int)pyhOffset);}publicstaticMappedByteBufferinitFileChannel()throwsIOException{if(mappedByteBuffernull){FileChannelcommitLogfileChannelFileChannel。open(Paths。get(URI。create(file:c:123commitLog。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);mappedByteBuffercommitLogfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);commitLogfileChannel。close();}returnmappedByteBuffer;}根据偏移量读取CcommitLogpublicstaticvoidreadCommitLogByOffset(intoffset)throwsIOException{存放顺序,读时候保持顺序一致b。putLong(totalSize);totalSizeb。put(getBytes(msgId,64));msgIdb。put(getBytes(topic,64));topic,定长64b。putLong(queueOffset);索引偏移量b。putLong(msgBody。getBytes(StandardCharsets。UTF8)。length);bodySizeb。put(msgBody。getBytes(StandardCharsets。UTF8));bodyb。put(getBytes(transactionId,64));b。putLong(commitLogOffset);commitLogOffsetSystem。out。println(commitlog读取偏移量为offset的消息);MappedByteBuffermappedByteBufferinitFileChannel();mappedByteBuffer。position(offset);longtotalSizemappedByteBuffer。getLong();消息长度byte〔〕msgIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(msgIdByte);byte〔〕topicBytenewbyte〔64〕;固定是64mappedByteBuffer。get(topicByte);longqueueOffsetmappedByteBuffer。getLong();LongbodySizemappedByteBuffer。getLong();intbSizeInteger。valueOf(bodySize);byte〔〕bodyBytenewbyte〔bSize〕;bodySize长度不固定mappedByteBuffer。get(bodyByte);byte〔〕transactionIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(transactionIdByte);longcommitLogOffsetmappedByteBuffer。getLong();偏移量System。out。println(totalSize:totalSize);System。out。println(msgId:newString(msgIdByte));System。out。println(topic:newString(topicByte));System。out。println(queueOffset:queueOffset);System。out。println(bodySize:bodySize);System。out。println(body:newString(bodyByte));System。out。println(transactionId:newString(transactionIdByte));System。out。println(commitLogOffset:commitLogOffset);}publicstaticbyte〔〕toByteArray(longnumber){bytelengthLong。BYTES;byte〔〕bytesnewbyte〔length〕;for(bytei0;ilength;i){bytes〔length1i〕(byte)number;number8;}returnbytes;}} 运行结果:getindexFileheaderbeginTimestampIndex:1669554286826endTimestampIndex:1669552196010beginPhyoffsetIndex:0endPhyoffsetIndex:31259hashSlotcountIndex:10indexCountIndex:10messageId:8b78474fb28a444299a06f7883f0302b,取模为:65哈希槽的字节数组位置:(40654)300第几个文件index4,实际存储数据的字节数组位置:(401004index20)520从index获取到的commitLog偏移量为:1274commitlog读取偏移量为1274的消息totalSize:320msgId:8b78474fb28a444299a06f7883f0302btopic:TopictestqueueOffset:4bodySize:40body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:5c0ff6feaea340c386476f4bdd797a78commitLogOffset:1274 本文基于javaNIO实现了RocketMQ的文件系统的最精简的实现,希望能帮助相关开发人员了解文件系统底层的实现原理。欢迎一起交流讨论,不足的地方欢迎指正。