Java中有哪些队列ArrayBlockingQueue使用ReentrantLockLinkedBlockingQueue使用ReentrantLockConcurrentLinkedQueue使用CAS等等 我们清楚使用锁的性能比较低,尽量使用无锁设计。接下来就我们来认识下Disruptor。Disruptor简单使用 github地址:https:github。comLMAXExchangedisruptorwikiPerformanceResults 先简单介绍下:Disruptor它是一个开源的并发框架,并获得2011Duke’s程序框架创新奖【Oracle】,能够在无锁的情况下实现网络的Queue并发操作。英国外汇交易公司LMAX开发的一个高性能队列,号称单线程能支撑每秒600万订单日志框架Log4j2异步模式采用了Disruptor来处理局限呢,他就是个内存队列,也就是说无法支撑分布式场景。 简单使用 数据传输对象DatapublicclassEventData{privateLongvalue;} 消费者publicclassEventConsumerimplementsWorkHandlerEventData{消费回调parameventDatathrowsExceptionOverridepublicvoidonEvent(EventDataeventData)throwsException{Thread。sleep(5000);System。out。println(Thread。currentThread(),eventData:eventData。getValue());}} 生产者publicclassEventProducer{privatefinalRingBufferEventDataringBuffer;publicEventProducer(RingBufferEventDataringBuffer){this。ringBufferringBuffer;}publicvoidsendData(Longv){cas展位longnextringBuffer。next();try{EventDataeventDataringBuffer。get(next);eventData。setValue(v);}finally{通知等待的消费者System。out。println(EventProducersendsuccess,sequence:next);ringBuffer。publish(next);}}} 测试类publicclassDisruptorTest{publicstaticvoidmain(String〔〕args){2的n次方intbufferSize8;DisruptorEventDatadisruptornewDisruptorEventData(()newEventData(),事件工厂bufferSize,环形数组大小Executors。defaultThreadFactory(),线程池工厂ProducerType。MULTI,支持多事件发布者newBlockingWaitStrategy());等待策略设置消费者disruptor。handleEventsWithWorkerPool(newEventConsumer(),newEventConsumer(),newEventConsumer(),newEventConsumer());disruptor。start();RingBufferEventDataringBufferdisruptor。getRingBuffer();EventProducereventProducernewEventProducer(ringBuffer);longi0;for(;;){i;eventProducer。sendData(i);try{Thread。sleep(1500);}catch(InterruptedExceptione){e。printStackTrace();}}}}核心组件 基于上面简单例子来看确实很简单,Disruptor帮我们封装好了生产消费模型的实现,接下来我们来看下他是基于哪些核心组件来支撑起一个高性能无锁队列呢? RingBuffer:环形数组,底层使用数组entries,在初始化时填充数组,避免不断新建对象带来的开销。后续只会对entries做更新操作 Sequencer:核心管家定义生产同步的实现:SingleProducerSequencer单生产、MultiProducerSequencer多生产当前写的进度Sequencecursor所有消费者进度的数组Sequence〔〕gatingSequencesMultiProducerSequencer可用区availableBuffer【利用空间换取查询效率】 Sequence:本身就是一个序号器用来标识处理进度,也可以当做是一个atomicInteger;还有另外一个特点,为了解决伪共享问题而引入的:缓存行填充。这个在后面介绍。 workProcessor:处理Event的循环,在循环中获取Disruptor的事件,然后把事件分配给各个handler EventHandler:负责业务逻辑的handler,自己实现。 WaitStrategy:消费者如何等待事件的策略,定义了如下策略leepingWaitStrategy:自旋yieldsleepBlockingWaitStrategy:加锁,适合CPU资源紧张(不需要切换线程),系统吞吐量无要求的YieldingWaitStrategy:自旋yield自旋BusySpinWaitStrategy:自旋,减少线程之前切换PhasedBackoffWaitStrategy:自旋yield自定义策略带着问题来解析代码? 1、多生产者如何保证消息生产不会相互覆盖。【如何达到互斥效果】 每个线程获取不同的一段数组空间,然后通过CAS判断这段空间是否已经分配出去。 接下来我们看下多生产类MultiProducerSequencer中next方法【获取生产序号】消费者上一次消费的最小序号后续第二点会讲到privatefinalSequencegatingSequenceCachenewSequence(Sequencer。INITIALCURSORVALUE);当前进度的序号protectedfinalSequencecursornewSequence(Sequencer。INITIALCURSORVALUE);所有消费者的序号后续第二点会讲到protectedvolatileSequence〔〕gatingSequencesnewSequence〔0〕;publiclongnext(intn){if(n1){thrownewIllegalArgumentException(nmustbe0);}longcurrent;longnext;do{当前进度的序号,Sequence的value具有可见性,保证多线程间线程之间能感知到可申请的最新值currentcursor。get();要申请的序号空间:最大序列号nextcurrentn;longwrapPointnextbufferSize;消费者最小序列号longcachedGatingSequencegatingSequenceCache。get();大于一圈最小消费序列号当前进度if(wrapPointcachedGatingSequencecachedGatingSequencecurrent){longgatingSequenceUtil。getMinimumSequence(gatingSequences,current);说明大于1圈,并没有多余空间可以申请if(wrapPointgatingSequence){LockSupport。parkNanos(1);TODO,shouldwespinbasedonthewaitstrategy?continue;}更新最小值到Sequence的value中gatingSequenceCache。set(gatingSequence);}CAS成功后更新当前Sequence的valueelseif(cursor。compareAndSet(current,next)){break;}}while(true);returnnext;} 2、生产者向序号器申请写的序号,如序号正在被消费,Sequencer是如何知道哪些序号是可以被写入的呢?【未消费则被覆盖如何处理】 从gatingSequences中取得最小的序号,生产者最多能写到这个序号的后一位。通俗来讲就是申请的序号不能大于最小消费者序号一圈【申请到最大序列号buffersize要小于等于最小消费的序列号】的时候,才能申请到当前写的序号 publicfinalEventHandlerGroupThandleEventsWithWorkerPool(finalWorkHandlerT。。。workHandlers){returncreateWorkerPool(newSequence〔0〕,workHandlers);}EventHandlerGroupTcreateWorkerPool(finalSequence〔〕barrierSequences,finalWorkHandlerlt;?superT〔〕workHandlers){finalSequenceBarriersequenceBarrierringBuffer。newBarrier(barrierSequences);finalWorkerPoolTworkerPoolnewWorkerPool(ringBuffer,sequenceBarrier,exceptionHandler,workHandlers);consumerRepository。add(workerPool,sequenceBarrier);finalSequence〔〕workerSequencesworkerPool。getWorkerSequences();updateGatingSequencesForNextInChain(barrierSequences,workerSequences);returnnewEventHandlerGroup(this,consumerRepository,workerSequences);}privatevoidupdateGatingSequencesForNextInChain(finalSequence〔〕barrierSequences,finalSequence〔〕processorSequences){if(processorSequences。length0){消费者启动后就会将所有消费者存放入AbstractSequencer中gatingSequencesringBuffer。addGatingSequences(processorSequences);for(finalSequencebarrierSequence:barrierSequences){ringBuffer。removeGatingSequence(barrierSequence);}consumerRepository。unMarkEventProcessorsAsEndOfChain(barrierSequences);}} 3、在多生产者情况下,生产者是申请到一段可写入的序号,然后再写入这些序号中,那么消费者是如何感知哪些序号是可以被消费的呢?【借问提1图说明】 这个前提是多生产者情况下,第一点我们说过每个线程获取不同的一段数组空间,那么现在单单通过序号已经不够用了,MultiProducerSequencer使用了int数组【availableBuffer】来标识当前序号是否可用。当生产者成功生产事件后会将availableBuffer中当前序列号置为1标识可以读取。 如此消费者可以读取的的最大序号就是我们availableBuffer中第一个不可用序号1。 初始化availableBuffer流程publicMultiProducerSequencer(intbufferSize,finalWaitStrategywaitStrategy){super(bufferSize,waitStrategy);初始化可用数组availableBuffernewint〔bufferSize〕;indexMaskbufferSize1;indexShiftUtil。log2(bufferSize);initialiseAvailableBuffer();}初始化默认availableBuffer为1privatevoidinitialiseAvailableBuffer(){for(intiavailableBuffer。length1;i!0;i){setAvailableBufferValue(i,1);}setAvailableBufferValue(0,1);}生产者成功生产事件将可用区数组置为1publicvoidpublish(finallongsequence){setAvailable(sequence);waitStrategy。signalAllWhenBlocking();}privatevoidsetAvailableBufferValue(intindex,intflag){longbufferAddress(indexSCALE)BASE;UNSAFE。putOrderedInt(availableBuffer,bufferAddress,flag);} 消费者消费流程WorkProcessor类中消费run方法publicvoidrun(){booleanprocessedSequencetrue;longcachedAvailableSequenceLong。MINVALUE;longnextSequencesequence。get();Teventnull;while(true){try{先通过cas获取消费事件的占有权if(processedSequence){processedSequencefalse;do{nextSequenceworkSequence。get()1L;sequence。set(nextSequence1L);}while(!workSequence。compareAndSet(nextSequence1L,nextSequence));}数据就绪,可以消费if(cachedAvailableSequencenextSequence){eventringBuffer。get(nextSequence);触发回调函数workHandler。onEvent(event);processedSequencetrue;}else{获取可以被读取的下标cachedAvailableSequencesequenceBarrier。waitFor(nextSequence);}}。。。。省略}notifyShutdown();running。set(false);}publiclongwaitFor(finallongsequence)throwsAlertException,InterruptedException,TimeoutException{checkAlert();这个值获取的currentwrite下标,可以认为全局消费下标。此处与每一段的write1和write2下标区分开longavailableSequencewaitStrategy。waitFor(sequence,cursorSequence,dependentSequence,this);if(availableSequencesequence){returnavailableSequence;}通过availableBuffer筛选出第一个不可用序号1returnsequencer。getHighestPublishedSequence(sequence,availableSequence);}publiclonggetHighestPublishedSequence(longlowerBound,longavailableSequence){从currentread下标开始,循环至currentwrite,如果碰到availableBuffer为1直接返回for(longsequencelowerBound;sequenceavailableSequence;sequence){if(!isAvailable(sequence)){returnsequence1;}}returnavailableSequence;}解决伪共享问题什么是伪共享问题呢? 为了提高CPU的速度,Cpu有高速缓存Cache,该缓存最小单位为缓存行CacheLine,他是从主内存复制的Cache的最小单位,通常是64字节。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。如果你访问一个long数组,当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。 伪共享问题是指,当多个线程共享某份数据时,线程1可能拉到线程2的数据在其cacheline中,此时线程1修改数据,线程2取其数据时就要重新从内存中拉取,两个线程互相影响,导致数据虽然在cacheline中,每次却要去内存中拉取。 Disruptor是如何解决的呢? 在value前后统一都加入7个Long类型进行填充,线程拉取时,不论如何都会占满整个缓存 回顾总结:Disuptor为何能称之为高性能的无锁队列框架呢?缓存行填充,避免缓存频繁失效。【java8中也引入sun。misc。Contended注解来避免伪共享】无锁竞争:通过CAS【二阶段提交】环形数组:数据都是覆盖,避免GC底层更多的使用位运算来提升效率