专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

阿里二面RockeMQ是如何通过mmap大幅提升单机吞吐量的

  今天抽空给大家整理了一份关于RocketMQ的高性能知识点文章总结。希望能对各位读者有所帮助。
  关于RockeMQ的基本介绍
  简介
  RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
  发展背景2011年:业界出现了现在被很多大数据领域所推崇的Kafka消息引擎,阿里巴巴在研究了Kafka的整体机制和架构设计之后,基于Kafka的设计使用Java进行了完全重写并推出了MetaQ1。0版本,主要是用于解决顺序消息和海量堆积的问题。2012年:阿里巴巴开源其自研的第三代分布式消息中间件RocketMQ。2016年11月:阿里将RocketMQ捐献给Apache软件基金会,正式成为孵化项目。2017年2月20日:RocketMQ正式发布4。0版本,专家称新版本适用于电商领域,金融领域,大数据领域,兼有物联网领域的编程模型。2022年:RocketMQ正式发布5。0版本,这也是目前最新版本。
  RocketMQ的各个特点单机吞吐量:10ws。可用性:支持双主双从的分布式架构,具备高可用特性。支持使用topic,tag,SQL来对消息进行筛选。底层结构通过多队列来承载消息存储等特性。可靠的FIFO和严格有序的消息队列中间件。支持多种消息传递协议,例如grpc,Mqtt,Jms。。。。源码实现:Java语言。
  PageCache和Mmap
  RocketMQ这款中间件具有着单机10w的吞吐量,其底层原因,实际上得从操作系统原理开始和大家讲起。
  顺序写入
  磁盘的写步骤通常是:CPU发送一个写信号给到磁盘磁头,接着磁头需要进行寻道操作,找到对应的磁道后,定位对应的位置进行数据写入。所以如果数据是随机写入的话,磁头就需要频繁地切换盘道进行数据的写入,整体耗时会有所提升。
  顺序写,其实是一种非常常见的提升IO写性能的方式,利用连续的写入地址,从而减少磁头的切换次数,提升性能。
  PageCache
  为了提升对文件的读写效率,Linux内核会以页大小(4KB)为单位,将文件划分为多数据块。当用户对文件中的某个数据块进行读写操作时,内核首先会申请一个内存页(称为页缓存)与文件中的数据块进行绑定。
  例如下边这张图,当我们发起一次系统调用的write方法,想要将用户态中的数据写入磁盘的时候,其实是需要发生以下操作的:
  首先,将用户地址空间的数据通过CPU拷贝,放入到内核空间中,并且写入一个PageCache里面,然后通过DMA去将PageCache的数据写入到磁盘。
  这里面由于有CPU拷贝这样的重操作,所以想要提升吞吐量,必须解决这个问题。而RocketMQ的创作团队,则是通过mmap技术来解决了它。
  什么是mmap
  mmap系统调用,让用户地址空间,跟文件做映射(实际是指向不存在的物理内存)。将内核态的一段空间地址映射到了用户态中,这样数据只需要写入到用户态的这段虚拟地址中,接着内核空间的DMA会将这段数据写入到磁盘中。这样之后,整体的写入流程就如下图所示:
  使用mmap技术之后,可以减少一次的CPU拷贝次数,提升性能。
  这里补充一些说明,其实DMA本质上是一块访问独立的芯片。由于每次访问磁盘进行IO操作都会导致CPU大量的空闲时间,而DMA则是用于提升IO操作效率的一个角色,主要用于IO的数据传输,降低CPU等待时间。
  Java里面如何使用mmap技术
  在Java语言中,其实很早就有提供mmap方面的api了,下边是一段简单的mmap使用案例。publicclassMmapUtils{publicFilecommitLogFile;publicMappedByteBuffermappedByteBuffer;publicintmappedSize0;publicintwritePos0;publicMmapUtils(StringcommitLogPath,intmappedSize){this。commitLogFilenewFile(commitLogPath);if(!commitLogFile。exists()){try{commitLogFile。createNewFile();}catch(IOExceptione){e。printStackTrace();}}this。mappedSizemappedSize;try{mappedByteBuffernewRandomAccessFile(commitLogFile,rw)。getChannel()。map(FileChannel。MapMode。READWRITE,0,mappedSize);}catch(IOExceptione){e。printStackTrace();}}往磁盘写数据paramcontentreturnpublicintwriteFile(Stringcontent){mappedByteBuffer。put(content。getBytes());强制刷盘mappedByteBuffer。force();writePosmappedByteBuffer。position();return1;}从磁盘中读取数据paramlenreturnpublicbyte〔〕readContent(intlen){mappedByteBuffer。position(0);byte〔〕destnewbyte〔len〕;intj0;for(inti0;imappedSize;i){bytebmappedByteBuffer。get();if(b!0){dest〔j〕b;}}returndest;}}
  mmap存在的缺陷
  其实使用mmap技术还是存在一些缺陷的。
  导致缺页中断问题
  我们知道,在操作系统的中,数据通常都是被放在磁盘中的,只有在需要计算的时候,才会将数据加载到内存中,而每次加载的单位都是以页作为基础,假设我们需要访问一块存在于磁盘,但是没有被加载到内存中的数据,这种情况,我们称之为软性的缺页中断。如果数据是存在于内存,但是该页的地址没有被注册到MMU中,我们则称之为硬性的缺页中断。
  总之不管是软性还是硬性的中断,都需要重新建立一次数据的内存映射,比较消耗性能。
  mmap对于文件的大小有一定要求
  使用mmap技术之后,我们不可以使用2gb大小以上的文件去做映射,同时文件的长度也不建议做变长,最好是固定的大小。最后对于小文件而言,使用mmap可能性能还不如直接的原始IO操作。
  RocketMQ对mmap的优化
  好了,现在我们了解了mmap存在的缺陷之后,来看看RocketMQ是如何解决这些不足点的。
  预映射
  在RocketMQ的源代码里,可以看到这样一份代码,它的名字叫做:org。apache。rocketmq。store。AllocateMappedFileService。这个类负责在rocketmq启动时,预先分配mmap的文件映射。
  文件预热
  调用mmap进行内存映射后,OS只是建立虚拟内存地址至物理地址的映射表,没有实际加载任何文件至内存。依靠一次缺页加载4K,1G的commitLog需要发生256次缺页中断。而在RocketMQ的源代码中,进行了madvise系统调用,其目的是使操作系统做一次内存映射后对应的文件数据尽可能多地预加载进内存,从而实现预热。
  内存锁定
  将进程使用的部分或者全部的地址空间锁定在物理内存中,并会先写入一些随机值到mmap映射出的内存空间里,防止其被交换到swap空间。基于mlock系统调用。
  堆外缓存
  到这里,我们了解了RocketMQ内部通过使用PageCache去让我们的数据写入如写入内存般轻松,但是这在极端情况下,可能会有出现频繁出现缺页中断的情况,以及PageCache阻塞,这种情况下,Broker节点会返回一个SYSTEMBUSY的信号给到客户端,不过这类场景在实际生产中很少会出现。为了避免这种情况,RocketMQ底层增加了一套堆外缓存来优化这类场景。当PageCache写入阻塞的时候,可以选择写入到堆外缓存中,之后再从堆外缓存(DirectByteBuffer)写入到PageCache。
  RocketMQ中的mmap预分配实现分析
  下边让我们来看看RocketMQ底层是如何进行mmap的文件映射预分配的。在RocketMQ的源代码中,当服务启动之后,AllocateMappedFileService这个线程类便会开始执行。
  线程启动之后,便会执行mmapOperation方法,这个方法的大致步骤如下:从优先级队列中获取AllocateRequest创建MappedFile根据配置是否预热MappedFile(填充0字节),将MappedFile放入到AllocateRequest如果出现IOException将AllocateRequest重新放入优先级队列调用AllocateRequest的CountDownLatchcountDown方法通知putRequestAndReturnMappedFile线程
  整体执行目的总结起来就是:初始化预热mappedFile。
  关于mmapOperation的源代码和其含义,我列了一份出来给各位读者查看:Onlyinterruptedbytheexternalthread,willreturnfalseprivatebooleanmmapOperation(){booleanisSuccessfalse;AllocateRequestreqnull;try{从优先级队列里获取AllocateRequestreqthis。requestQueue。take();从Map里获取AllocateRequestAllocateRequestexpectedRequestthis。requestTable。get(req。getFilePath());if(nullexpectedRequest){log。warn(thismmaprequestexpired,maybecausetimeoutreq。getFilePath()req。getFileSize());returntrue;}putRequestAndReturnMappedFile里map与优先级队列并不是强一致,是最终一致的if(expectedRequest!req){log。warn(neverexpectedhere,maybecausetimeoutreq。getFilePath()req。getFileSize(),req:req,expectedRequest:expectedRequest);returntrue;}if(req。getMappedFile()null){longbeginTimeSystem。currentTimeMillis();MappedFilemappedFile;堆外内存if(messageStore。getMessageStoreConfig()。isTransientStorePoolEnable()){try{mappedFileServiceLoader。load(MappedFile。class)。iterator()。next();mappedFile。init(req。getFilePath(),req。getFileSize(),messageStore。getTransientStorePool());}catch(RuntimeExceptione){log。warn(Usedefaultimplementation。);mappedFilenewMappedFile(req。getFilePath(),req。getFileSize(),messageStore。getTransientStorePool());}}else{mappedFilenewMappedFile(req。getFilePath(),req。getFileSize());}longeclipseTimeUtilAll。computeEclipseTimeMilliseconds(beginTime);创建MappedFile花费大于10ms打印日志if(eclipseTime10){intqueueSizethis。requestQueue。size();log。warn(createmappedFilespenttime(ms)eclipseTimequeuesizequeueSizereq。getFilePath()req。getFileSize());}prewritemappedFile默认warmMapedFileEnablefalse,即默认不预热if(mappedFile。getFileSize()this。messageStore。getMessageStoreConfig()。getMapedFileSizeCommitLog()this。messageStore。getMessageStoreConfig()。isWarmMapedFileEnable()){文件预热mappedFile。warmMappedFile(this。messageStore。getMessageStoreConfig()。getFlushDiskType(),this。messageStore。getMessageStoreConfig()。getFlushLeastPagesWhenWarmMapedFile());}req。setMappedFile(mappedFile);this。hasExceptionfalse;isSuccesstrue;}}catch(InterruptedExceptione){log。warn(this。getServiceName()interrupted,possiblybyshutdown。);this。hasExceptiontrue;returnfalse;}catch(IOExceptione){log。warn(this。getServiceName()servicehasexception。,e);this。hasExceptiontrue;if(null!req){重新插入请求到队列requestQueue。offer(req);try{Thread。sleep(1);}catch(InterruptedExceptionignored){}}}finally{AllocateRequest计数器减一,表示MappedFile已经创建完成if(req!nullisSuccess)req。getCountDownLatch()。countDown();}returntrue;}
  在这段代码里头,我们可以看到有个文件预热的方法,叫做:warmMappedFile,该方法内部最后发出了一次系统调用,mlock方法。publicvoidwarmMappedFile(FlushDiskTypetype,intpages){longbeginTimeSystem。currentTimeMillis();ByteBufferbyteBufferthis。mappedByteBuffer。slice();intflush0;longtimeSystem。currentTimeMillis();for(inti0,j0;ithis。fileSize;iMappedFile。OSPAGESIZE,j){byteBuffer。put(i,(byte)0);forceflushwhenflushdisktypeissyncif(typeFlushDiskType。SYNCFLUSH){if((iOSPAGESIZE)(flushOSPAGESIZE)pages){flushi;mappedByteBuffer。force();}}preventgcif(j10000){log。info(j{},costTime{},j,System。currentTimeMillis()time);timeSystem。currentTimeMillis();try{Thread。sleep(0);}catch(InterruptedExceptione){log。error(Interrupted,e);}}}forceflushwhenprepareloadfinishedif(typeFlushDiskType。SYNCFLUSH){log。info(mappedfilewarmupdone,forcetodisk,mappedFile{},costTime{},this。getFileName(),System。currentTimeMillis()beginTime);mappedByteBuffer。force();}log。info(mappedfilewarmupdone。mappedFile{},costTime{},this。getFileName(),System。currentTimeMillis()beginTime);这里是一个系统调用this。mlock();}
  这段代码的末尾处调用的mlock函数,它的内部是利用了Java的JNI机制,去进行系统调用。
  JNA的本质就是将大多数native的方法封装到jar包中的动态库中,并且提供了一系列的机制来自动加载这个动态库。例如下边这个例子就是基于JNI去调用clib中的print方法:publicclassJNAUsage{publicinterfaceCLibraryextendsLibrary{CLibraryINSTANCE(CLibrary)Native。load((Platform。isWindows()?msvcrt:c),CLibrary。class);voidprintf(Stringformat,Object。。。args);}publicstaticvoidmain(String〔〕args){CLibrary。INSTANCE。printf(Hello,World);for(inti0;iargs。length;i){CLibrary。INSTANCE。printf(Argumentd:s,i,args〔i〕);}}}
  这个例子中,我们想要加载系统的clib,从而使用clib中的printf方法。
  具体做法就是创建一个CLibraryinterface,这个interface继承自Library,然后使用Native。load方法来加载clib,最后在这个interface中定义要使用的lib中的方法即可。
  那么在RocketMQ中,底层又是如何通过JNI来实现mlock的调用呢,来看源代码:publicvoidmlock(){finallongbeginTimeSystem。currentTimeMillis();finallongaddress((DirectBuffer)(this。mappedByteBuffer))。address();PointerpointernewPointer(address);{intretLibC。INSTANCE。mlock(pointer,newNativeLong(this。fileSize));log。info(mlock{}{}{}ret{}timeconsuming{},address,this。fileName,this。fileSize,ret,System。currentTimeMillis()beginTime);}{intretLibC。INSTANCE。madvise(pointer,newNativeLong(this。fileSize),LibC。MADVWILLNEED);log。info(madvise{}{}{}ret{}timeconsuming{},address,this。fileName,this。fileSize,ret,System。currentTimeMillis()beginTime);}}
  上边代码中的LIbC是一个作者封装的系统调用接口文件,里面正好是映射了操作系统的mlock方法。packageorg。apache。rocketmq。store。util;importcom。sun。jna。Library;importcom。sun。jna。Native;importcom。sun。jna。NativeLong;importcom。sun。jna。Platform;importcom。sun。jna。Pointer;publicinterfaceLibCextendsLibrary{LibCINSTANCE(LibC)Native。loadLibrary(Platform。isWindows()?msvcrt:c,LibC。class);intMADVWILLNEED3;intMADVDONTNEED4;intMCLCURRENT1;intMCLFUTURE2;intMCLONFAULT4;syncmemoryasynchronouslyintMSASYNC0x0001;invalidatemappingscachesintMSINVALIDATE0x0002;synchronousmemorysyncintMSSYNC0x0004;intmlock(Pointervar1,NativeLongvar2);intmunlock(Pointervar1,NativeLongvar2);intmadvise(Pointervar1,NativeLongvar2,intvar3);Pointermemset(Pointerp,intv,longlen);intmlockall(intflags);intmsync(Pointerp,NativeLonglength,intflags);}
  通过调用mlock可以将进程使用的部分或者全部的地址空间锁定在物理内存中,防止其被交换到swap空间。对时间敏感的应用会希望全部使用物理内存,提高数据访问和操作的效率。例如,memcached就提供锁定内存的选项,保证memcached使用内存全部在物理内存中。
  通过mlock锁定对应的地址空间,防止被swap出去,这样的效果,也正是RocketMQ所需要的。
  来源:https:mp。weixin。qq。comsQrf8fCvBbNGEbqsLf3lRvA

摩尔多瓦总统提名雷切安为总理来源新华社基希讷乌消息摩尔多瓦总统桑杜10日签署总统令,提名多林雷切安为新总理。当天早些时候,加夫里利察宣布辞去摩尔多瓦总理职务。据摩通社报道,桑杜遂同议会政党进行磋商并作上述决定济宁兖州区兴隆庄街道强化监督出重拳优化营商见成效大众网海报新闻见习记者魏东辉济宁报道近日,济宁兖州区兴隆庄街道为更好地服务市场主体,强化监督出重拳,不断培育和激发市场主体活力,持续优化营商环境软实力。兴隆庄街道推出营商环境红黑榜复古传奇黑暗光年失落币如何获得失落币怎么使用在黑暗光年游戏里,有很多有用的货币,诸如灵符元宝等,甚至还有一种叫失落币的稀有货币。相信很多新手玩家对这种货币不太熟悉的,今天小编就来告诉大家,黑暗光年失落币如何获得失落币怎么使用复古传奇黑暗光年弑神锻造在哪里在黑暗光年游戏里,弑神锻造又称之为斗战胜佛锻造,是勇士们追求的高端装备。很多小伙伴未曾知道这类装备怎么弄的,今天小编就来为大家介绍下,黑暗光年弑神锻造的玩法内容吧。大家等级满足15佳能EOSR8R50上手体验小巧紧凑性能出色2023年2月8日,佳能正式发布了新一代全画幅专微相机EOSR8APSC画幅青春专微EOSR50等新品,进一步扩展了R影像系统,也给广大用户提供了新的机身和镜头选择。无忌有幸在第一净月潭二月份游园攻略已备好及时查收天气回暖,别在家宅着了,出去玩怎么样?中呀!想去环境好,能放松又能玩的地方妥,安排,去这怎么样!二月初春,相约净月潭感受冬天的快乐,收录春天的美好游园攻略已为您准备好,拿走不谢温馨北宋中期,王安石与富弼的经略辽夏思想分别是什么?王安石经略辽夏思想熙宁四年(1071),神宗问及经制辽夏问题时,王安石坦言昔魏徵有言中国既安,远人自服此实至理。自古未有政事修而财用不足远人不服者。熙宁五年(1072)正月,神宗与他1977年出狱穷困潦倒,妻离子散,小19岁美女说我就嫁老丑穷!有几个男人能有他晚年幸运呢?1977年4月,他踏出待了9年的监狱门,门外没亲人,身上没有生活的钱。回到台北,他将一辆破旧汽车当成了家,靠着朋友接济艰难度日。此时的他,已经57岁,离什么是超级细菌?气候变化对超级细菌有什么影响?全球气候变暖,给生态环境带来不可逆的变化,例如北极南极冰雪融化,极地的冰雪面积越来越小,同时,海平面不断升高,尤其是海中的小岛日益下沉。并且,气候的变化还带来了超级细菌的增加。那么自动驾驶尚存长尾难题影子驾驶的AB面在汽车电动化革命之后,自动驾驶技术将掀起新的一场革命。然而,部分零碎的场景极端的情况和无法预测的人类行为等长尾难题仍是制约自动驾驶发展的关键难题。在此背景下,有业内人士提出,以影子chatGPT写CRC校验,生成多项式X7X6X5X21,用PHP怎么实现试了一下写crc校验与加密,确实能实现呢。具体代码如下lt?phpfunctioncrccheck(data)generator0xA1crc0x00for(i0istrlen(d
中国致力于促进互联网发展成果惠及不同国家和地区人民国务院新闻办公室7日发布的携手构建网络空间命运共同体白皮书指出,中国坚持以人为本科技向善,积极响应国际社会需求,共同致力于弥合数字鸿沟,推动网络文化交流与文明互鉴,加强对弱势群体的瞄准20万元以上市场合创首款纯电轿车将于年内上市交付21世纪经济报道记者宋豆豆报道近日新能源汽车品牌合创汽车首款轿跑车型A06正式开启大定,共发布了前驱550A630C以及四驱性能版共3个版本,补贴后的价格区间为1827万元,预计将重磅!三个国家创新中心获批,两只股票直接拉涨停!半夏李蓓A股很可能站在新一轮长牛的起点今天,A股市场和港股市场整体继续上涨,A股有色金属板块多只个股大涨,其中千亿有色金属巨头紫金矿业A股盘中一度触及涨停。港股今天上午表现更为强势,恒生科技指数上午盘中涨幅甚至超过了5京东物流布局北斗7仓,推进链网融合,助力西北地区高质量发展今年京东11。11前,京东物流亚洲一号西安智能产业园(以下简称西安亚一)2期正式启动运营,建成了西北地区首个全流程柔性生产物流园区,应用北斗新仓模式。西安亚一2期落地后,陕西省内9资讯上海开放首批自动驾驶高速公路,开放道路里程超500公里文懂车帝原创魏微懂车帝原创行业11月7日,据澎湃新闻报道,在第二届智能交通上海论坛开幕环节,上海市首批自动驾驶高速公路,嘉定区域内G1503绕城高速21。5公里G2京沪高速19。5外媒全球芯片市场格局生变,美芯片企业遇寒气作为芯片市场的霸主,老美在芯片领域拥有据对的话语权,这也是过去几十年里,中企选择造不如买这条路的主要原因。然而,落后就要挨打,华为就是血的教训。但美修改芯片规则,也遭到了不少美芯片2022年双十一哪款旗舰手机值得购买?头条创作挑战赛不同人买旗舰手机的原因也许并不一样,有人真的需要旗舰的顶级配置,有些人可能只在意它的品牌,价格,或者说只是因为它贵有面子才选。回到旗舰机本身来说,各家最主要的区别就是华为余承东再放豪言未来智能座舱只有鸿蒙和其他未来智能座舱分为两种,一种是鸿蒙,另一种是others。11月4日,华为智能汽车解决方案BUCEO余承东在2022华为开发者大会上这样说道。会上,余承东还宣布2023年第一季度,A从首富跌落神坛的大佬们,现在他们过得怎么样?一年一度的双十一购物狂欢节转眼就到了,这个由淘宝发明的促销活动今年已经进行到第十四年,并引发了线上线下大小商家们的争相效仿。从广告投放效果来看,这两年双十一宣传声势最大的还当属淘宝首批6艘订单曝光!中国最老造船厂进军汽车运输船市场全球汽车运输船市场正在经历前所未有的火热行情,新船订单已经创下十余年新高,如今,又有一家中国船企进军汽车船市场。日前,福建省船舶工业集团有限公司旗下福建船政重工股份有限公司联合有关2022年,外资加速抄底中国房地产的动作,什么信号?又如何解读?11月财经新势力这是熊猫贝贝的第1384篇原创文章外资加码布局中国房地产市场2022年,全球资本对于中国的房地产资产领域的态度,出现了非常微妙的转变。2016年10月4日,彼时的国
友情链接:快好知快生活快百科快传网中准网文好找聚热点快软网