专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

构建高性能内存队列Disruptoryyds

  Java中有哪些队列ArrayBlockingQueue使用ReentrantLockLinkedBlockingQueue使用ReentrantLockConcurrentLinkedQueue使用CAS等等
  我们清楚使用锁的性能比较低,尽量使用无锁设计。接下来就我们来认识下Disruptor。Disruptor简单使用
  github地址:https:github。comLMAXExchangedisruptorwikiPerformanceResults
  先简单介绍下:Disruptor它是一个开源的并发框架,并获得2011Duke’s程序框架创新奖【Oracle】,能够在无锁的情况下实现网络的Queue并发操作。英国外汇交易公司LMAX开发的一个高性能队列,号称单线程能支撑每秒600万订单日志框架Log4j2异步模式采用了Disruptor来处理局限呢,他就是个内存队列,也就是说无法支撑分布式场景。
  简单使用
  数据传输对象DatapublicclassEventData{privateLongvalue;}
  消费者publicclassEventConsumerimplementsWorkHandlerEventData{消费回调parameventDatathrowsExceptionOverridepublicvoidonEvent(EventDataeventData)throwsException{Thread。sleep(5000);System。out。println(Thread。currentThread(),eventData:eventData。getValue());}}
  生产者publicclassEventProducer{privatefinalRingBufferEventDataringBuffer;publicEventProducer(RingBufferEventDataringBuffer){this。ringBufferringBuffer;}publicvoidsendData(Longv){cas展位longnextringBuffer。next();try{EventDataeventDataringBuffer。get(next);eventData。setValue(v);}finally{通知等待的消费者System。out。println(EventProducersendsuccess,sequence:next);ringBuffer。publish(next);}}}
  测试类publicclassDisruptorTest{publicstaticvoidmain(String〔〕args){2的n次方intbufferSize8;DisruptorEventDatadisruptornewDisruptorEventData(()newEventData(),事件工厂bufferSize,环形数组大小Executors。defaultThreadFactory(),线程池工厂ProducerType。MULTI,支持多事件发布者newBlockingWaitStrategy());等待策略设置消费者disruptor。handleEventsWithWorkerPool(newEventConsumer(),newEventConsumer(),newEventConsumer(),newEventConsumer());disruptor。start();RingBufferEventDataringBufferdisruptor。getRingBuffer();EventProducereventProducernewEventProducer(ringBuffer);longi0;for(;;){i;eventProducer。sendData(i);try{Thread。sleep(1500);}catch(InterruptedExceptione){e。printStackTrace();}}}}核心组件
  基于上面简单例子来看确实很简单,Disruptor帮我们封装好了生产消费模型的实现,接下来我们来看下他是基于哪些核心组件来支撑起一个高性能无锁队列呢?
  RingBuffer:环形数组,底层使用数组entries,在初始化时填充数组,避免不断新建对象带来的开销。后续只会对entries做更新操作
  Sequencer:核心管家定义生产同步的实现:SingleProducerSequencer单生产、MultiProducerSequencer多生产当前写的进度Sequencecursor所有消费者进度的数组Sequence〔〕gatingSequencesMultiProducerSequencer可用区availableBuffer【利用空间换取查询效率】
  Sequence:本身就是一个序号器用来标识处理进度,也可以当做是一个atomicInteger;还有另外一个特点,为了解决伪共享问题而引入的:缓存行填充。这个在后面介绍。
  workProcessor:处理Event的循环,在循环中获取Disruptor的事件,然后把事件分配给各个handler
  EventHandler:负责业务逻辑的handler,自己实现。
  WaitStrategy:消费者如何等待事件的策略,定义了如下策略leepingWaitStrategy:自旋yieldsleepBlockingWaitStrategy:加锁,适合CPU资源紧张(不需要切换线程),系统吞吐量无要求的YieldingWaitStrategy:自旋yield自旋BusySpinWaitStrategy:自旋,减少线程之前切换PhasedBackoffWaitStrategy:自旋yield自定义策略带着问题来解析代码?
  1、多生产者如何保证消息生产不会相互覆盖。【如何达到互斥效果】
  每个线程获取不同的一段数组空间,然后通过CAS判断这段空间是否已经分配出去。
  接下来我们看下多生产类MultiProducerSequencer中next方法【获取生产序号】消费者上一次消费的最小序号后续第二点会讲到privatefinalSequencegatingSequenceCachenewSequence(Sequencer。INITIALCURSORVALUE);当前进度的序号protectedfinalSequencecursornewSequence(Sequencer。INITIALCURSORVALUE);所有消费者的序号后续第二点会讲到protectedvolatileSequence〔〕gatingSequencesnewSequence〔0〕;publiclongnext(intn){if(n1){thrownewIllegalArgumentException(nmustbe0);}longcurrent;longnext;do{当前进度的序号,Sequence的value具有可见性,保证多线程间线程之间能感知到可申请的最新值currentcursor。get();要申请的序号空间:最大序列号nextcurrentn;longwrapPointnextbufferSize;消费者最小序列号longcachedGatingSequencegatingSequenceCache。get();大于一圈最小消费序列号当前进度if(wrapPointcachedGatingSequencecachedGatingSequencecurrent){longgatingSequenceUtil。getMinimumSequence(gatingSequences,current);说明大于1圈,并没有多余空间可以申请if(wrapPointgatingSequence){LockSupport。parkNanos(1);TODO,shouldwespinbasedonthewaitstrategy?continue;}更新最小值到Sequence的value中gatingSequenceCache。set(gatingSequence);}CAS成功后更新当前Sequence的valueelseif(cursor。compareAndSet(current,next)){break;}}while(true);returnnext;}
  2、生产者向序号器申请写的序号,如序号正在被消费,Sequencer是如何知道哪些序号是可以被写入的呢?【未消费则被覆盖如何处理】
  从gatingSequences中取得最小的序号,生产者最多能写到这个序号的后一位。通俗来讲就是申请的序号不能大于最小消费者序号一圈【申请到最大序列号buffersize要小于等于最小消费的序列号】的时候,才能申请到当前写的序号
  publicfinalEventHandlerGroupThandleEventsWithWorkerPool(finalWorkHandlerT。。。workHandlers){returncreateWorkerPool(newSequence〔0〕,workHandlers);}EventHandlerGroupTcreateWorkerPool(finalSequence〔〕barrierSequences,finalWorkHandlerlt;?superT〔〕workHandlers){finalSequenceBarriersequenceBarrierringBuffer。newBarrier(barrierSequences);finalWorkerPoolTworkerPoolnewWorkerPool(ringBuffer,sequenceBarrier,exceptionHandler,workHandlers);consumerRepository。add(workerPool,sequenceBarrier);finalSequence〔〕workerSequencesworkerPool。getWorkerSequences();updateGatingSequencesForNextInChain(barrierSequences,workerSequences);returnnewEventHandlerGroup(this,consumerRepository,workerSequences);}privatevoidupdateGatingSequencesForNextInChain(finalSequence〔〕barrierSequences,finalSequence〔〕processorSequences){if(processorSequences。length0){消费者启动后就会将所有消费者存放入AbstractSequencer中gatingSequencesringBuffer。addGatingSequences(processorSequences);for(finalSequencebarrierSequence:barrierSequences){ringBuffer。removeGatingSequence(barrierSequence);}consumerRepository。unMarkEventProcessorsAsEndOfChain(barrierSequences);}}
  3、在多生产者情况下,生产者是申请到一段可写入的序号,然后再写入这些序号中,那么消费者是如何感知哪些序号是可以被消费的呢?【借问提1图说明】
  这个前提是多生产者情况下,第一点我们说过每个线程获取不同的一段数组空间,那么现在单单通过序号已经不够用了,MultiProducerSequencer使用了int数组【availableBuffer】来标识当前序号是否可用。当生产者成功生产事件后会将availableBuffer中当前序列号置为1标识可以读取。
  如此消费者可以读取的的最大序号就是我们availableBuffer中第一个不可用序号1。
  初始化availableBuffer流程publicMultiProducerSequencer(intbufferSize,finalWaitStrategywaitStrategy){super(bufferSize,waitStrategy);初始化可用数组availableBuffernewint〔bufferSize〕;indexMaskbufferSize1;indexShiftUtil。log2(bufferSize);initialiseAvailableBuffer();}初始化默认availableBuffer为1privatevoidinitialiseAvailableBuffer(){for(intiavailableBuffer。length1;i!0;i){setAvailableBufferValue(i,1);}setAvailableBufferValue(0,1);}生产者成功生产事件将可用区数组置为1publicvoidpublish(finallongsequence){setAvailable(sequence);waitStrategy。signalAllWhenBlocking();}privatevoidsetAvailableBufferValue(intindex,intflag){longbufferAddress(indexSCALE)BASE;UNSAFE。putOrderedInt(availableBuffer,bufferAddress,flag);}
  消费者消费流程WorkProcessor类中消费run方法publicvoidrun(){booleanprocessedSequencetrue;longcachedAvailableSequenceLong。MINVALUE;longnextSequencesequence。get();Teventnull;while(true){try{先通过cas获取消费事件的占有权if(processedSequence){processedSequencefalse;do{nextSequenceworkSequence。get()1L;sequence。set(nextSequence1L);}while(!workSequence。compareAndSet(nextSequence1L,nextSequence));}数据就绪,可以消费if(cachedAvailableSequencenextSequence){eventringBuffer。get(nextSequence);触发回调函数workHandler。onEvent(event);processedSequencetrue;}else{获取可以被读取的下标cachedAvailableSequencesequenceBarrier。waitFor(nextSequence);}}。。。。省略}notifyShutdown();running。set(false);}publiclongwaitFor(finallongsequence)throwsAlertException,InterruptedException,TimeoutException{checkAlert();这个值获取的currentwrite下标,可以认为全局消费下标。此处与每一段的write1和write2下标区分开longavailableSequencewaitStrategy。waitFor(sequence,cursorSequence,dependentSequence,this);if(availableSequencesequence){returnavailableSequence;}通过availableBuffer筛选出第一个不可用序号1returnsequencer。getHighestPublishedSequence(sequence,availableSequence);}publiclonggetHighestPublishedSequence(longlowerBound,longavailableSequence){从currentread下标开始,循环至currentwrite,如果碰到availableBuffer为1直接返回for(longsequencelowerBound;sequenceavailableSequence;sequence){if(!isAvailable(sequence)){returnsequence1;}}returnavailableSequence;}解决伪共享问题什么是伪共享问题呢?
  为了提高CPU的速度,Cpu有高速缓存Cache,该缓存最小单位为缓存行CacheLine,他是从主内存复制的Cache的最小单位,通常是64字节。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。如果你访问一个long数组,当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。
  伪共享问题是指,当多个线程共享某份数据时,线程1可能拉到线程2的数据在其cacheline中,此时线程1修改数据,线程2取其数据时就要重新从内存中拉取,两个线程互相影响,导致数据虽然在cacheline中,每次却要去内存中拉取。
  Disruptor是如何解决的呢?
  在value前后统一都加入7个Long类型进行填充,线程拉取时,不论如何都会占满整个缓存
  回顾总结:Disuptor为何能称之为高性能的无锁队列框架呢?缓存行填充,避免缓存频繁失效。【java8中也引入sun。misc。Contended注解来避免伪共享】无锁竞争:通过CAS【二阶段提交】环形数组:数据都是覆盖,避免GC底层更多的使用位运算来提升效率

野渡无人泊孤舟,舟中淹留难释愁水路晴空霎时彤云蔽,疾风飕飕似鸣镝。雨云雷频鸣,轰隆复轰隆,白浪素波汹涌起。疾风飕飕似鸣镝。图片来源于网络风过处林木摇曳,呻吟声声甚凄切。金蛇云中游,四野亮如昼,漠漠晦暗顿撕裂。风隐去的鼓声咚咚巴,咚巴咚,咚咚巴,咚巴咚平日里,红石岭就像种在丘陵上的一颗地瓜,静得不发一点儿声息,一朵酸枣花儿在坡上,静静的开了,又悄悄的谢了一只牛儿默默地啃着,犄角顶起了太阳,又扯下了月江西赣州楼市的危机,激进的赣州昂贵的房价,赣州市楼市的潜力江西的地理位置很好,东部靠着长三角,南部靠着珠三角。位于江西南部的城市主要是赣州市,现在的赣州已经成为江西第二大中心城市,被定义为省域副中心城市。在过去的2022年赣州市共完成经济万亿海淀,不偏科2022年,北京市海淀区实现经济总量破万亿,达到10206。9亿元。这意味着海淀成为继上海浦东之后的第二个经济总量破万亿元的市辖区。从2022年公布的数据看,全国范围内GDP过万亿全面注册制改革!中国股民是利好还是利空?这可不好说!点击右上方关注,第一时间获取每天行情点评炒股技巧时事热点资讯等,有任何问题欢迎留言。千呼万唤始出来!全面注册制终于要来了!2023年2月1日,证监会针对全面实行股票发行注册制主要制1月销量比亚迪15万辆同比增59,新势力现危机导读由于受到1月春节放假以及特斯拉降价等因素的波及,国内新能源车企的销量增速普遍放缓,甚至出现负增长。(文潘昱辰编辑周远方)2023年已经过去了第1个月,2月伊始,国内主要新能源车黄金还可以买吗?在中国人心中一向有特殊地位。在很多重要时刻,如嫁娶寿宴新春添丁等,老百姓都喜欢将黄金作为礼物。中国自古还有乱世买黄金的说法,也就是说,每当危机来临时,人们总是希望借助一种便于储藏财去年129家首店落地浦东新区,陆家嘴商圈受青睐上海被视为首店经济的标杆城市,2022年在首店经济发展中继续引领全国。来自中商数据的统计显示,2022年上海共计引入1073家首店(含旗舰店概念店),与2021年基本持平,在首店经智能手机行业萎靡不振,iPhone14销量不佳,苹果也没辙当苹果公司真正走下神坛的时候,外界的关注度反而没那么高。一方面,手机电脑这类数码产品市场早已趋近饱和,消费者对电子数码产品见怪不怪另一方面,苹果公司早就不是那个引领智能手机行业潮流智能自控5连板,股东高管集中减持套现5000多万元2月3日,智能自控(002877。SZ)股价继续高开高走,开盘仅6分钟即封死涨停板。截至收盘,智能自控封单金额2亿元,全天成交3。74亿元,换手率15。71,总市值39。08亿元。原材料价格创新低,新能源车企着急去宁化?随着新能源汽车市场的不断升温,作为核心部件的动力电池也备受关注。在动力电池行业发展过程中,产业链不断追求一个三角,那就是更优越的性能更低的成本更好的安全性。市场上主流的动力电池有磷
集装箱船运输市场3月刊供需两面夹击,运价低迷难解来源国投安信期货2月以来,发货量在春节假期结束后有所回升,但市场供大于求的格局不改,运价走势低迷。需求方面欧美去库进程略见成效,但居民消费降级趋势持续,短期内难见集运贸易量回升。运外媒聚焦瑞士信贷集团危机冲击国际银行业据彭博新闻社网站3月15日报道,瑞士信贷集团(简称瑞信集团)由来已久的麻烦在15日爆发为一场全面危机,该公司的股票和债券暴跌,全球最大的一些银行竞相保护自己的财务状况免受这一潜在后乌兹别克斯坦驻华大使愿在一带一路框架下深化乌中多领域合作法尔霍德阿尔济耶夫致辞。王磊摄中新网成都3月17日电(记者岳依桐)一带一路倡议给沿线国家带来了很多机遇,乌兹别克斯坦是一带一路倡议的推动者支持者。17日,首次到访四川的乌兹别克斯坦今年的清明节不调休了头条创作挑战赛今年的清明节刚好是星期三,没有安排调休了,去年因为假期调休好几次登上了热搜,结果清明节放一天假不调休又被骂了,再一次登上了热搜!这次有三个观点,有的网友觉得清明节是传日照银行山海天支行持续开展消费者权益保护宣传教育大众网海报新闻记者程裕涵日照报道为进一步提升金融消费者的自我保护意识和风险责任意识,构建和谐的金融消费环境,近期,日照银行山海天支行持续开展315消费者权益保护宣传教育,宣传金融消专访保你平安主演王迅再次和大鹏合作,他已经是成熟的导演了由大鹏执导,大鹏李雪琴尹正王迅王圣迪领衔主演的电影保你平安正在热映,目前豆瓣评分已经上升到7。9分,影片因为涉及网暴等热点话题备受关注。作为一部现实题材喜剧,保你平安讲述了一个由谣沸石早已被全球知名企业应用,快来看看有哪些企业吧?沸石作为一种新型环保材料,因其具有高度发达的内部规则孔隙结构,巨大的内外比表面积等特性,因而具有优越的吸附性能,当前在许多领域成为难以替代的主流吸附材料。近年来,沸石在土壤修复水处3。15报道专挑软柿子捏报道的都是小企业,大企难道没问题吗?这个3。15可以说是热闹非凡,在开场前几天更是热足了身,比如很多媒体都在跟踪之前被3。15报道的企业如今怎么样了?因为每一次3。15都能给消费者带来很多冲击和意外,有很多暗访和剧情创建世界一流企业培育一批金种子企业我省将实施新一轮国企改革深化提升行动3月初,湖北省国资国企系统主要负责人密集前往广东深圳山东等省市国资委和国有企业调研取经,以便做好下一步湖北国资国企改革发展工作。政府工作报告提出,深化国资国企改革,提高国企核心竞争专访澳门保险公会会长周士军研究推动在横琴设立保险经营机构,开发设计跨境医疗和养老保险产品南方财经全媒体记者郭晓洁彭敏静珠海报道日前公布的关于金融支持横琴粤澳深度合作区建设的意见(简称横琴金融30条)提及了多项保险相关内容,包括支持内地与澳门保险机构联合研发针对横琴粤澳新型婚家业务离婚跟拍最近在某红薯,悄然兴起了一种新型跟拍服务离婚跟拍。对,你没有听错,主要服务对象就是离婚人群,从他们再次踏入民政局开始,一直跟拍到两个人分道扬镳。如果说结婚跟拍,主打一个男女互动,讲
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网