专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

自己动手实现一个RocketMQ文件存储系统

  前言
  RocketMQ作为一款优秀的开源消息中间件,很多java开发者都在使用并研究里面的源码。目前网上有很多关于RocketMQ源代码的文章,但是很多文章只是从框架开发者的的角度分析源码,没有从技术实现本质进行剖析。因此很多源码学习者在读完后还是一知半解,当自己想动手写的时候不知要用到哪种技术,无从着手。笔者基于对RocketMQ的文件存储研究,结合开发者常见的技术,自己动手实现了一个简化版本的RocketMQ文件系统,希望能抽丝剥茧,帮助开发者从本质上理解RocketMQ文件存储的原理,起到抛砖引玉,举一反三的作用。
  RocketMQ逻辑存储结构
  本文适合对RocketMQ的文件存储原理有一定的了解,熟悉javaNIO,希望了解RocketMQ是如何通过javaNIO实现的读者。以下代码部分:
  1。手动生成10个消息,并创建commitLog文件,consumeQueue,indexFile文件packageorg。apache。rocketmq。test。smoke;importcom。alibaba。fastjson。JSON;importorg。apache。rocketmq。test。smoke。model。ConsumerQueueData;importorg。apache。rocketmq。test。smoke。model。IndexFileHeaderData;importorg。apache。rocketmq。test。smoke。model。IndexFileItemData;importjava。io。IOException;importjava。net。URI;importjava。nio。ByteBuffer;importjava。nio。MappedByteBuffer;importjava。nio。channels。FileChannel;importjava。nio。charset。StandardCharsets;importjava。nio。file。Paths;importjava。nio。file。StandardOpenOption;importjava。util。ArrayList;importjava。util。List;importjava。util。Random;importjava。util。UUID;publicclassCommitLogWriteTest{privatestaticLongcommitLogOffset0L;8byte(commitlogoffset)privatestaticLonglastTotalSize0L;privatestaticLongcurrentTotalSize0L;privatestaticListConsumerQueueDataconsumerQueueDatasnewArrayList();privatestaticListIndexFileItemDataindexFileItemDatasnewArrayList();privatestaticintMESSAGECOUNT10;publicstaticvoidmain(String〔〕args)throwsIOException{createCommitLog();createConsumerQueue();createIndexFile();}privatestaticvoidcreateCommitLog()throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123commitLog。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);fileChannel。close();RandomrandomnewRandom();intcount0;for(inti0;iMESSAGECOUNT;i){longcommitLogOffsetlastTotalSize;StringtopicTopictest;StringmsgIdUUID。randomUUID()。toString();StringmsgBody消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg。substring(0,random。nextInt(48));longqueueOffseti;索引偏移量StringtransactionIdUUID。randomUUID()。toString();数据格式,位置固定inttotalSize;消息长度StringmsgId;Stringtopic;longqueueOffset;索引偏移量longbodySize;消息长度byte〔〕body;消息内容StringtransactionId;longcommitLogOffset;从第一个文件开始算的偏移量inttotalSize8totalSize长度64msgId长度64topic长度8索引偏移量长度8消息长度长度msgBody。getBytes(StandardCharsets。UTF8)。length消息内容长度64transactionId长度64commitLogOffset长度;;ByteBufferbByteBuffer。allocate(totalSize);如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300mappedByteBuffer。position(Integer。valueOf(commitLogOffset));b。putLong(totalSize);totalSizeb。put(getBytes(msgId,64));msgIdb。put(getBytes(topic,64));topic,定长64b。putLong(queueOffset);索引偏移量b。putLong(msgBody。getBytes(StandardCharsets。UTF8)。length);bodySizeb。put(msgBody。getBytes(StandardCharsets。UTF8));bodyb。put(getBytes(transactionId,64));b。putLong(commitLogOffset);bodySizeb。flip();mappedByteBuffer。put(b);lastTotalSizetotalSizelastTotalSize;System。out。println(写入消息,第:i次);System。out。println(totalSize:totalSize);System。out。println(msgId:msgId);System。out。println(topic:topic);System。out。println(msgBody:msgBody);System。out。println(transactionId:transactionId);System。out。println(commitLogOffset:commitLogOffset);ConsumerQueueDataconsumerQueueDatanewConsumerQueueData();consumerQueueData。setOffset(commitLogOffset);consumerQueueData。setMsgLength(totalSize);consumerQueueData。setTagCode(100L);准备生成consumeQueue文件consumerQueueDatas。add(consumerQueueData);IndexFileItemDataindexFileItemDatanewIndexFileItemData();indexFileItemData。setKeyHash(msgId。hashCode());indexFileItemData。setMessageId(msgId);indexFileItemData。setPhyOffset(commitLogOffset);准备生成indexFile文件indexFileItemDatas。add(indexFileItemData);mappedByteBuffer。force();count;}System。out。println(commitLog数据保存完成,totalSize:count);}publicstaticvoidcreateConsumerQueue()throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123consumerQueue。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,4096);fileChannel。close();intcount0;for(inti0;iconsumerQueueDatas。size();i){ConsumerQueueDataconsumerQueueDataconsumerQueueDatas。get(i);指定写入位置mappedByteBuffer。position(i20);mappedByteBuffer。putLong(consumerQueueData。getOffset());8byte(commitlogoffset)mappedByteBuffer。putInt(consumerQueueData。getMsgLength());4byte(msgLength)mappedByteBuffer。putLong(consumerQueueData。getTagCode());8byte(tagCode)count;System。out。println(consumerQueue数据写入完成:JSON。toJSONString(consumerQueueData));mappedByteBuffer。force();}System。out。println(ConsumerQueue数据保存完成count:count);}publicstaticvoidcreateIndexFile()throwsIOException{文件场创建时间,在写第一条消息的时候创建FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123index。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);ByteBufferheaderByteBuffermappedByteBuffer。slice();longfirstDataTimeSystem。currentTimeMillis();fileChannel。close();开始写hash槽,从头部后写入已经填充有index的slot数量(并不是每个slot槽下都挂载有index索引单元,这里统计的是所有挂载了index索引单元的slot槽的数量,hash冲突)inthashSlotCount0;已该indexFile中包含的索引单元个数(统计出当前indexFile中所有slot槽下挂载的所有index索引单元的数量之和),如果没有hash冲突,hashSlotCountindexCountintindexCount0;假设建立100个槽位(总长度400)intsoltNum100;for(inti0;iMESSAGECOUNT;i){IndexFileItemDataindexFileItemDataindexFileItemDatas。get(i);intkeyHashindexFileItemData。getKeyHash();取模,计算第几个槽位intslotPoskeyHash1000?keyHash100:1(keyHash100);slot存放第几条数据的的位置(字节数组位置)intabsSlotPos40slotPos4;存储实际数据的位置(字节数组位置)intabsIndexPos40soltNum4indexCount20;将hash槽的值设置为indexCount,建立索引,即第n条消息保存在XX位置mappedByteBuffer。putInt(absSlotPos,indexCount);写入数据mappedByteBuffer。putInt(absIndexPos,indexFileItemData。getKeyHash());4bytemsghashcodemappedByteBuffer。putLong(absIndexPos4,indexFileItemData。getPhyOffset());8bytepyhoffsetmappedByteBuffer。putInt(absIndexPos48,Integer。valueOf((System。currentTimeMillis()firstDataTime)));4byte(timeDiff)mappedByteBuffer。putInt(absIndexPos484,0);4byte(preIndex),暂置0,暂不考虑hash冲突的情况模拟最后一个文件,写入headerif(i0){该indexFile中第一条消息的存储时间headerByteBuffer。putLong(0,firstDataTime);该indexFile种第一条消息在commitlog种的偏移量commitlogoffsetmappedByteBuffer。putLong(16,indexFileItemData。getPhyOffset());}模拟最后一个文件,写入headerif(i99){该indexFile种最后一条消息存储时间headerByteBuffer。putLong(8,System。currentTimeMillis());该indexFile中最后一条消息在commitlog中的偏移量commitlogoffsetheaderByteBuffer。putLong(24,indexFileItemData。getPhyOffset());}已经填充有index的slot数量headerByteBuffer。putInt(32,hashSlotCount1);该indexFile中包含的索引单元个数headerByteBuffer。putInt(36,indexCount1);mappedByteBuffer。force();System。out。println(msgId:indexFileItemData。getMessageId(),keyHash:keyHash,保存槽位为slotPos的数据,absSlotPosabsSlotPos,值indexindexCount,绝对位置:absIndexPos,commitphyOffset:indexFileItemData。getPhyOffset());indexCount;hashSlotCount;}}将变长字符串定长byte〔〕,方便读取privatestaticbyte〔〕getBytes(Strings,intlength){intfixLengthlengths。getBytes()。length;if(s。getBytes()。lengthlength){byte〔〕Sbytesnewbyte〔length〕;System。arraycopy(s。getBytes(),0,Sbytes,0,s。getBytes()。length);for(intxlengthfixLength;xlength;x){Sbytes〔x〕0x00;}returnSbytes;}returns。getBytes(StandardCharsets。UTF8);}}
  运行结果:写入消息,第:0次totalSize:322msgId:61b5c500f7f54bfebeb150a8148534c0topic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgtransactionId:9453ba39398240e9926d47b51d360590commitLogOffset:0写入消息,第:1次totalSize:306msgId:d0fbf80f223b4721a43e518b152decc2topic:TopictestmsgBody:消息内容msgmsgmsgmsgmstransactionId:e2ef165258fa4849bf74885c7e5db9e3commitLogOffset:322写入消息,第:2次totalSize:307msgId:199053e3e6164611ab0de5c7af4549a9topic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgtransactionId:33d21abe0d8e4c0e9c78f415daefd767commitLogOffset:628写入消息,第:3次totalSize:339msgId:8e799d8e32904f6bab5d289153446994topic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:98b46b96cc884969a56f282d25799085commitLogOffset:935写入消息,第:4次totalSize:320msgId:8b78474fb28a444299a06f7883f0302btopic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:5c0ff6feaea340c386476f4bdd797a78commitLogOffset:1274写入消息,第:5次totalSize:312msgId:b33c6f31cc96462bb09599410459082ctopic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmstransactionId:57420047253943faa3f2b2f55c7b059ccommitLogOffset:1594写入消息,第:6次totalSize:324msgId:d0a6803f8555418e988ab3b9a70d14f0topic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:296013353fcd4193b14f140bbaf409a4commitLogOffset:1906写入消息,第:7次totalSize:293msgId:91151ec5e76b456090b7ab77f9d04c9atopic:TopictestmsgBody:消息内容mtransactionId:291e54de2ebe41b1b974e81a2e9f1370commitLogOffset:2230写入消息,第:8次totalSize:323msgId:eb21df35b4dc43aa8604e9a103f25a7btopic:TopictestmsgBody:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:3005a39db8cb4138ae0534b65fc135a2commitLogOffset:2523写入消息,第:9次totalSize:296msgId:abcda3648fc34d18ae5d1d7a8ffd0929topic:TopictestmsgBody:消息内容msgmtransactionId:d42733b539114f0ab1db11eb45a30345commitLogOffset:2846commitLog数据保存完成,totalSize:10创建consumerQueue文件开始consumerQueue数据写入完成:{msgLength:322,offset:0,tagCode:100}consumerQueue数据写入完成:{msgLength:306,offset:322,tagCode:100}consumerQueue数据写入完成:{msgLength:307,offset:628,tagCode:100}consumerQueue数据写入完成:{msgLength:339,offset:935,tagCode:100}consumerQueue数据写入完成:{msgLength:320,offset:1274,tagCode:100}consumerQueue数据写入完成:{msgLength:312,offset:1594,tagCode:100}consumerQueue数据写入完成:{msgLength:324,offset:1906,tagCode:100}consumerQueue数据写入完成:{msgLength:293,offset:2230,tagCode:100}consumerQueue数据写入完成:{msgLength:323,offset:2523,tagCode:100}consumerQueue数据写入完成:{msgLength:296,offset:2846,tagCode:100}ConsumerQueue数据保存完成count:10创建索引文件开始msgId:61b5c500f7f54bfebeb150a8148534c0,keyHash:249765627,保存槽位为27的数据,absSlotPos148,值index0,绝对位置:440,commitphyOffset:0msgId:d0fbf80f223b4721a43e518b152decc2,keyHash:1587335015,保存槽位为15的数据,absSlotPos100,值index1,绝对位置:460,commitphyOffset:322msgId:199053e3e6164611ab0de5c7af4549a9,keyHash:791210473,保存槽位为73的数据,absSlotPos332,值index2,绝对位置:480,commitphyOffset:628msgId:8e799d8e32904f6bab5d289153446994,keyHash:1460275929,保存槽位为29的数据,absSlotPos156,值index3,绝对位置:500,commitphyOffset:935msgId:8b78474fb28a444299a06f7883f0302b,keyHash:1174005465,保存槽位为65的数据,absSlotPos300,值index4,绝对位置:520,commitphyOffset:1274msgId:b33c6f31cc96462bb09599410459082c,keyHash:1695757800,保存槽位为0的数据,absSlotPos40,值index5,绝对位置:540,commitphyOffset:1594msgId:d0a6803f8555418e988ab3b9a70d14f0,keyHash:1334295408,保存槽位为8的数据,absSlotPos72,值index6,绝对位置:560,commitphyOffset:1906msgId:91151ec5e76b456090b7ab77f9d04c9a,keyHash:1287318090,保存槽位为90的数据,absSlotPos400,值index7,绝对位置:580,commitphyOffset:2230msgId:eb21df35b4dc43aa8604e9a103f25a7b,keyHash:239865974,保存槽位为74的数据,absSlotPos336,值index8,绝对位置:600,commitphyOffset:2523msgId:abcda3648fc34d18ae5d1d7a8ffd0929,keyHash:1173357775,保存槽位为75的数据,absSlotPos340,值index9,绝对位置:620,commitphyOffset:2846
  2。读取consumeQueue文件,并根据offset从commitLog读取一条完整的消息packageorg。apache。rocketmq。test。smoke;importjava。io。IOException;importjava。net。URI;importjava。nio。ByteBuffer;importjava。nio。CharBuffer;importjava。nio。MappedByteBuffer;importjava。nio。channels。FileChannel;importjava。nio。charset。Charset;importjava。nio。charset。StandardCharsets;importjava。nio。file。Paths;importjava。nio。file。StandardOpenOption;publicclassConsumeQueueMessageReadTest{publicstaticMappedByteBuffermappedByteBuffernull;privatestaticintMESSAGECOUNT10;publicstaticvoidmain(String〔〕args)throwsIOException{FileChannelfileChannelFileChannel。open(Paths。get(URI。create(file:c:123consumerQueue。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBuffermappedByteBufferfileChannel。map(FileChannel。MapMode。READWRITE,0,4096);fileChannel。close();mappedByteBuffer。position(0);根据索引下标读取索引,实际情况是用户消费的最新点位,存在在broker的偏移量文件中intindex0;for(intiindex;iMESSAGECOUNT;i){mappedByteBuffer。position(i20);longcommitlogOffsetmappedByteBuffer。getLong();System。out。println(commitlogOffset);longmsgLenmappedByteBuffer。getInt();LongtagmappedByteBuffer。getLong();System。out。println(读取到consumerQueue,commitlogOffset:commitlogOffset,msgLen:msgLen);根据偏移量读取CcommitLogreadCommitLogByOffset(Integer。valueOf(commitlogOffset));}}publicstaticMappedByteBufferinitFileChannel()throwsIOException{if(mappedByteBuffernull){FileChannelcommitLogfileChannelFileChannel。open(Paths。get(URI。create(file:c:123commitLog。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);mappedByteBuffercommitLogfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);commitLogfileChannel。close();}returnmappedByteBuffer;}根据偏移量读取commitLogpublicstaticvoidreadCommitLogByOffset(intoffset)throwsIOException{存放顺序,读时候保持顺序一致b。putLong(totalSize);totalSizeb。put(getBytes(msgId,64));msgIdb。put(getBytes(topic,64));topic,定长64b。putLong(queueOffset);索引偏移量b。putLong(msgBody。getBytes(StandardCharsets。UTF8)。length);bodySizeb。put(msgBody。getBytes(StandardCharsets。UTF8));bodyb。put(getBytes(transactionId,64));b。putLong(commitLogOffset);commitLogOffsetSystem。out。println(commitlog读取偏移量为offset的消息);MappedByteBuffermappedByteBufferinitFileChannel();mappedByteBuffer。position(offset);longtotalSizemappedByteBuffer。getLong();消息长度byte〔〕msgIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(msgIdByte);byte〔〕topicBytenewbyte〔64〕;固定是64mappedByteBuffer。get(topicByte);longqueueOffsetmappedByteBuffer。getLong();LongbodySizemappedByteBuffer。getLong();intbSizeInteger。valueOf(bodySize);byte〔〕bodyBytenewbyte〔bSize〕;bodySize长度不固定mappedByteBuffer。get(bodyByte);byte〔〕transactionIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(transactionIdByte);longcommitLogOffsetmappedByteBuffer。getLong();偏移量System。out。println(totalSize:totalSize);System。out。println(msgId:newString(msgIdByte));System。out。println(topic:newString(topicByte));System。out。println(queueOffset:queueOffset);System。out。println(bodySize:bodySize);System。out。println(body:newString(bodyByte));System。out。println(transactionId:newString(transactionIdByte));System。out。println(commitLogOffset:commitLogOffset);}}
  运行结果:commitlog读取偏移量为0的消息totalSize:322msgId:61b5c500f7f54bfebeb150a8148534c0topic:TopictestqueueOffset:0bodySize:42body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgtransactionId:9453ba39398240e9926d47b51d360590commitLogOffset:0commitlog读取偏移量为322的消息totalSize:306msgId:d0fbf80f223b4721a43e518b152decc2topic:TopictestqueueOffset:1bodySize:26body:消息内容msgmsgmsgmsgmstransactionId:e2ef165258fa4849bf74885c7e5db9e3commitLogOffset:322commitlog读取偏移量为628的消息totalSize:307msgId:199053e3e6164611ab0de5c7af4549a9topic:TopictestqueueOffset:2bodySize:27body:消息内容msgmsgmsgmsgmsgtransactionId:33d21abe0d8e4c0e9c78f415daefd767commitLogOffset:628commitlog读取偏移量为935的消息totalSize:339msgId:8e799d8e32904f6bab5d289153446994topic:TopictestqueueOffset:3bodySize:59body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:98b46b96cc884969a56f282d25799085commitLogOffset:935commitlog读取偏移量为1274的消息totalSize:320msgId:8b78474fb28a444299a06f7883f0302btopic:TopictestqueueOffset:4bodySize:40body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:5c0ff6feaea340c386476f4bdd797a78commitLogOffset:1274commitlog读取偏移量为1594的消息totalSize:312msgId:b33c6f31cc96462bb09599410459082ctopic:TopictestqueueOffset:5bodySize:32body:消息内容msgmsgmsgmsgmsgmsgmstransactionId:57420047253943faa3f2b2f55c7b059ccommitLogOffset:1594commitlog读取偏移量为1906的消息totalSize:324msgId:d0a6803f8555418e988ab3b9a70d14f0topic:TopictestqueueOffset:6bodySize:44body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:296013353fcd4193b14f140bbaf409a4commitLogOffset:1906commitlog读取偏移量为2230的消息totalSize:293msgId:91151ec5e76b456090b7ab77f9d04c9atopic:TopictestqueueOffset:7bodySize:13body:消息内容mtransactionId:291e54de2ebe41b1b974e81a2e9f1370commitLogOffset:2230commitlog读取偏移量为2523的消息totalSize:323msgId:eb21df35b4dc43aa8604e9a103f25a7btopic:TopictestqueueOffset:8bodySize:43body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:3005a39db8cb4138ae0534b65fc135a2commitLogOffset:2523commitlog读取偏移量为2846的消息totalSize:296msgId:abcda3648fc34d18ae5d1d7a8ffd0929topic:TopictestqueueOffset:9bodySize:16body:消息内容msgmtransactionId:d42733b539114f0ab1db11eb45a30345commitLogOffset:28
  3。根据messageId读取indexFile,然后根据偏移量从CommitLog读取一条完整的消息packageorg。apache。rocketmq。test。smoke;importjava。io。IOException;importjava。net。URI;importjava。nio。MappedByteBuffer;importjava。nio。channels。FileChannel;importjava。nio。file。Paths;importjava。nio。file。StandardOpenOption;publicclassIndexFileMessageReadTest{publicstaticMappedByteBuffermappedByteBuffernull;publicstaticvoidmain(String〔〕args)throwsIOException{StringmsgId8b78474fb28a444299a06f7883f0302b;readByMessageId(msgId);}privatestaticvoidreadByMessageId(StringmessageId)throwsIOException{FileChannelindexFileChannelFileChannel。open(Paths。get(URI。create(file:c:123index。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);MappedByteBufferindexMappedByteBufferindexFileChannel。map(FileChannel。MapMode。READWRITE,0,4096);indexFileChannel。close();System。out。println(getindexFileheader);System。out。println(beginTimestampIndex:indexMappedByteBuffer。getLong());System。out。println(endTimestampIndex:indexMappedByteBuffer。getLong());System。out。println(beginPhyoffsetIndex:indexMappedByteBuffer。getLong());System。out。println(endPhyoffsetIndex:indexMappedByteBuffer。getLong());System。out。println(hashSlotcountIndex:indexMappedByteBuffer。getInt());System。out。println(indexCountIndex:indexMappedByteBuffer。getInt());System。out。println();intkeyHashmessageId。hashCode();取模,计算第几个槽位intslotPoskeyHash1000?keyHash100:1(keyHash100);System。out。println(messageId:messageId,取模为:slotPos);slot在文件中的字节数组位置intabsSlotPos40slotPos4;System。out。println(哈希槽的字节数组位置:(40slotPos4)absSlotPos);获取hash槽上存取的件索引,第几个文件intindexindexMappedByteBuffer。getInt(absSlotPos);计算数据需要存储的偏移量intabsIndexPos401004index20;System。out。println(第几个文件indexindex,实际存储数据的字节数组位置:(401004index20)absIndexPos);longkeyHash1indexMappedByteBuffer。getInt(absIndexPos);longpyhOffsetindexMappedByteBuffer。getLong(absIndexPos4);inttimeDiffindexMappedByteBuffer。getInt(absIndexPos48);intpreIndexNoindexMappedByteBuffer。getInt(absIndexPos484);System。out。println(从index获取到的commitLog偏移量为:pyhOffset);System。out。println();readCommitLogByOffset((int)pyhOffset);}publicstaticMappedByteBufferinitFileChannel()throwsIOException{if(mappedByteBuffernull){FileChannelcommitLogfileChannelFileChannel。open(Paths。get(URI。create(file:c:123commitLog。txt)),StandardOpenOption。WRITE,StandardOpenOption。READ);mappedByteBuffercommitLogfileChannel。map(FileChannel。MapMode。READWRITE,0,409600);commitLogfileChannel。close();}returnmappedByteBuffer;}根据偏移量读取CcommitLogpublicstaticvoidreadCommitLogByOffset(intoffset)throwsIOException{存放顺序,读时候保持顺序一致b。putLong(totalSize);totalSizeb。put(getBytes(msgId,64));msgIdb。put(getBytes(topic,64));topic,定长64b。putLong(queueOffset);索引偏移量b。putLong(msgBody。getBytes(StandardCharsets。UTF8)。length);bodySizeb。put(msgBody。getBytes(StandardCharsets。UTF8));bodyb。put(getBytes(transactionId,64));b。putLong(commitLogOffset);commitLogOffsetSystem。out。println(commitlog读取偏移量为offset的消息);MappedByteBuffermappedByteBufferinitFileChannel();mappedByteBuffer。position(offset);longtotalSizemappedByteBuffer。getLong();消息长度byte〔〕msgIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(msgIdByte);byte〔〕topicBytenewbyte〔64〕;固定是64mappedByteBuffer。get(topicByte);longqueueOffsetmappedByteBuffer。getLong();LongbodySizemappedByteBuffer。getLong();intbSizeInteger。valueOf(bodySize);byte〔〕bodyBytenewbyte〔bSize〕;bodySize长度不固定mappedByteBuffer。get(bodyByte);byte〔〕transactionIdBytenewbyte〔64〕;uuid固定是64mappedByteBuffer。get(transactionIdByte);longcommitLogOffsetmappedByteBuffer。getLong();偏移量System。out。println(totalSize:totalSize);System。out。println(msgId:newString(msgIdByte));System。out。println(topic:newString(topicByte));System。out。println(queueOffset:queueOffset);System。out。println(bodySize:bodySize);System。out。println(body:newString(bodyByte));System。out。println(transactionId:newString(transactionIdByte));System。out。println(commitLogOffset:commitLogOffset);}publicstaticbyte〔〕toByteArray(longnumber){bytelengthLong。BYTES;byte〔〕bytesnewbyte〔length〕;for(bytei0;ilength;i){bytes〔length1i〕(byte)number;number8;}returnbytes;}}
  运行结果:getindexFileheaderbeginTimestampIndex:1669554286826endTimestampIndex:1669552196010beginPhyoffsetIndex:0endPhyoffsetIndex:31259hashSlotcountIndex:10indexCountIndex:10messageId:8b78474fb28a444299a06f7883f0302b,取模为:65哈希槽的字节数组位置:(40654)300第几个文件index4,实际存储数据的字节数组位置:(401004index20)520从index获取到的commitLog偏移量为:1274commitlog读取偏移量为1274的消息totalSize:320msgId:8b78474fb28a444299a06f7883f0302btopic:TopictestqueueOffset:4bodySize:40body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:5c0ff6feaea340c386476f4bdd797a78commitLogOffset:1274
  本文基于javaNIO实现了RocketMQ的文件系统的最精简的实现,希望能帮助相关开发人员了解文件系统底层的实现原理。欢迎一起交流讨论,不足的地方欢迎指正。

1年烧掉近百亿!华为汽车业务再曝人事巨变,余承东智选车模式成最终赢家?本文来源时代财经作者林前华为智能汽车业务人事变动消息再起。据媒体近日报道,华为智能汽车解决方案BU(以下简称华为车BU)COO智能驾驶解决方案产品线总裁王军已被停职。另据媒体援引华倒计时一周关于近期BTC没有上车的朋友以下给出以下看法,不代表投资建议在上图位置没上车的人下面给出另外一个上车点大概还有1个礼拜的盘整时间上车开到2250022700附近看到30000压力区不结婚也可以生育登记不结婚也可以生育登记了,现在真的越来越随便了,哦不对,是越来越简便了。从计划生育到鼓励生育再到取消结婚限制生育登记,步子迈的好大,你们跟得上没?近日,四川省卫健委发布四川省生育登记用于测试通信协议的免费工具对于那些使用自动化系统SCADA等的人来说,使用通信协议配置设备之间的数据交换是一项常规任务。通常,客户端和服务器设备之间的这些集成并不容易获得。出现问题时,始终存在原因出在客户端萨尔瓦多区块链布局已见成效,8亿美元债券按时偿还萨尔瓦多作为区块链行业的顶流,但是传统主流媒体先前都对其债务(萨尔瓦多于1月份有8亿美元债券到期,另外还需支付对应利息)按时偿还表示悲观,但是现如今都对其债务按时清偿选择性失明,可将ChatGPT引入必应,微软市值一夜飙涨5450亿!微软CEO这是搜索引擎新的一天中国经济周刊经济网讯据报道,美东时间2月7日,科技股龙头微软收涨逾4,一夜市值飙涨超800亿美元(约5450亿元人民币),最新总市值1。99万亿美元,为5个月新高。当天,微软宣布推分子骨架编辑N原子删除本文来自微信公众号XMOLNews有机化合物的后期修饰官能团化能够为分子骨架带来丰富的结构多样性,从而避免了复杂的多步合成以及三废的产生。最近几十年来,科学家在分子编辑和后期官能团随笔丨朱克俭边关复边关祁连山边关复边关祁连山文朱克俭祁连山中秋。此行最后一站过祁连山。一清早,夫人便冲了杯葡萄糖。说带路上,据说可防高原反应。上车,朋友们争坐后排,后排颠簸,闷。我是小伙子,理所当然占到一席。微信淘宝互联互通新进展微信文章底部新增淘宝入口Tech星球2月7日消息,据电商报,近日淘宝在微信中又多了一个新的流量入口,就在微信文章底部的广告位,点击该位置的淘宝页面之后,可直接从微信跳转到淘宝内完成购买,且能返回淘宝首页浏不惧美国全面断供,华为有机会胜利,国产光刻机进展顺利从限制华为,到打压华为,再到全面封锁华为,美国的心态正在一步步发生变化,从狂妄自大的傲慢,到忘乎所以的自信,再到近乎疯狂的举动,可以明显看出,美国心态长在逐渐崩溃。原本是手到擒来,福建漳州第一大民企超越福耀玻璃,年进账402。40亿,老板宁德人2月财经新势力漳州,福建省辖地级市,位于福建省最南部,属于属闽南地区,是闽南文化发祥地之一,著名鱼米花果之乡,其生态竞争力位居全省前列。漳州辖区内有324319二条国道厦漳漳诏漳龙
豪横!弗格一家带72个超重行李回中国,海关人员惊呆我能拍个照吗?1月29日消息,日前,辽宁队外援弗格带着妻子和女儿从美国返回中国,准备参加第三阶段比赛,结果弗格一家人竟然带了72个行李,把海关工作人员都给震惊了。在春节之前,CBA第二阶段比赛结创造历史焦科维奇成就澳网十冠王新华社墨尔本1月29日电(记者郝亚琳韦骅)在29日的2023年澳网男单决赛中,塞尔维亚天王焦科维奇30击败3号种子希腊人齐齐帕斯,创造澳网十冠王的历史性纪录。1月29日,焦科维奇在继孙杨被禁赛之后,游泳女神也违规被禁,5年之久恐被迫退役每一个新的奥运周期,世界体坛都将会进行更新换代,对于中国体育来说同样如此。毕竟在竞技体育面前,年龄是最经不起考验,也无法回避的残酷现实,就算是顶级传奇也无法逃脱年龄带来的影响,都会印尼大师赛国羽2冠2亚,女双新锐抢眼!黄东萍组合混双登顶北京时间1月29日,2023年世界羽联世界巡回赛超级500级别的印尼大师赛落幕,中国队拿到2个冠军和2个亚军。刘圣书张殊贤夺得女双冠军,冯彦哲黄东萍拿到混双冠军,中国队在混双和男双爱德华兹不想毒奶自己,但最近就是咋投都进在今日的常规赛中,森林狼主场117111力克国王。本场比赛,森林狼后卫安东尼爱德华兹上场38分钟,21投14中,得到34分10篮板6助攻,赛后他接受了记者采访。谈到自己和迪阿隆福克讲好中国冰雪运动和科技创新的新时代故事访大型科普融媒体项目人民冰雪冰雪科技谈负责人彭元元来源人民周刊彭元元,人民网人民体育事业部总监中华少年强体育影视新时代工程主任。在北京冬奥会筹办期间管理2018中国冰球发展高峰论坛2019中芬冰雪合作论坛2019中芬冰球论坛等多个安贤洙现状揭晓!回韩处处遭排挤,找工作碰壁,40岁妻子把他坑惨在短道速滑的历史上,安贤洙无疑拥有举足轻重的地位。在运动员时期,安贤洙是史无前例的冬奥会六金王,他更是顶着重压变更国籍,代表俄罗斯击垮了老东家韩国。退役成为教练员后,安贤洙又帮助处为花街保驾护航,西乡志愿者化身护花使者2023年宝安迎春花市回归。为了让市民可以开心安全逛花市,过一年幸福吉祥年,西乡街道新时代文明实践所西乡团工委西乡街道志愿者联合会组织开展了行花街保驾护航志愿服务活动。从1月15日北京冬奥一周年,巴赫高度评价开启全球冰雪运动新时代视频加载中中国青年报客户端北京2月4日电(中青报中青网记者慈鑫)一年前的今天,北京2022年冬奥会隆重开幕。在今晚举行的纪念北京2022年冬奥会成功举办一周年系列活动启动仪式上,国CBA最新消息!辽宁新外援确定,北京无意换外援,马布里确定下课在CBA联赛的这个窗口期里,不少球队都传出了新的消息,有的球队在酝酿换帅,有的球队则是会在外援方面进行调整。目前CBA联赛有三大最新消息,首先就是辽宁男篮已经确定了新外援人选,并非CBA三消息北京打铁外援留队,李京龙两选择,CBA啦啦队值得期待今年北京队的战绩还是超出预期的,这得益于大外援利夫的加入,与天才小将曾凡博回国驰援,目前北京队17胜11负名列八强,事实上除了前三,第四到第八都是17胜积分45分。许多北京球迷,觉
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网