前言 concurrentMap是hashMap在并发情况下的替代map,是线程安全 本文针对1。8的源码分析问题如何保证线程安全?读加锁么,写加锁么?在jdk1。8与jdk1。7的差别?为什么key不能为null,value不能为null?原理概况 通过NodeK,V〔〕数组来保存添加到Map中的键值对keyhash不冲突,放在数组Keyhash冲突,以链表形式放在数组链表长度超过8个,又小于64,扩容数组,如果数组长度大于等于64,则转换成树扩容后,某个链表的数组长度小于6个,则会将该树转为链表源码结构源码 元素的基本节点staticclassNodeK,VimplementsMap。EntryK,V{finalinthash;finalKkey;volatileVval;volatileNodeK,Vnext;Node(inthash,Kkey,Vval,NodeK,Vnext){this。hashhash;this。keykey;this。valval;this。nextnext;}略 树的节点staticfinalclassTreeNodeK,VextendsNodeK,V{TreeNodeK,Vparent;redblacktreelinksTreeNodeK,Vleft;TreeNodeK,Vright;TreeNodeK,Vprev;neededtounlinknextupondeletionbooleanred;TreeNode(inthash,Kkey,Vval,NodeK,Vnext,TreeNodeK,Vparent){super(hash,key,val,next);this。parentparent;}略}重要方法 在ConcurrentHashMap中使用了unSafe方法,通过直接操作内存的方式来保证并发处理的安全性,使用的是硬件的安全机制用来返回节点数组的指定位置的节点的原子操作SuppressWarnings(unchecked)staticfinalK,VNodeK,VtabAt(NodeK,V〔〕tab,inti){return(NodeK,V)U。getObjectVolatile(tab,((long)iASHIFT)ABASE);}cas原子操作,在指定位置设定值staticfinalK,VbooleancasTabAt(NodeK,V〔〕tab,inti,NodeK,Vc,NodeK,Vv){returnU。compareAndSwapObject(tab,((long)iASHIFT)ABASE,c,v);}put源码当添加一对键值对的时候,首先会去判断保存这些键值对的数组是不是初始化了,如果没有的话就初始化数组然后通过计算hash值来确定放在数组的哪个位置如果这个位置为空则直接添加,如果不为空的话,则取出这个节点来如果取出来的节点的hash值是MOVED(1)的话,则表示当前正在对这个数组进行扩容,复制到新的数组,则当前线程也去帮助复制最后一种情况就是,如果这个节点,不为空,也不在扩容,则通过synchronized来加锁,进行添加操作然后判断当前取出的节点位置存放的是链表还是树如果是链表的话,则遍历整个链表,直到取出来的节点的key来个要放的key进行比较,如果key相等,并且key的hash值也相等的话,则说明是同一个key,则覆盖掉value,否则的话则添加到链表的末尾如果是树的话,则调用putTreeVal方法把这个元素添加到树中去最后在添加完成之后,会判断在该节点处共有多少个节点(注意是添加前的个数),如果达到8个以上了的话,则调用treeifyBin方法来尝试将处的链表转为树,或者扩容数组finalVputVal(Kkey,Vvalue,booleanonlyIfAbsent){k,v都不能为空,否则会报空指针异常if(keynullvaluenull)thrownewNullPointerException();取得Key的hash值inthashspread(key。hashCode());用来计算在这个节点总共有多少个元素,用来控制扩容或者转移树intbinCount0;for(NodeK,V〔〕tabtable;;){NodeK,Vf;intn,i,fh;第一次Put,没有初始化,则初始化tableif(tabnull(ntab。length)0)tabinitTable();通过哈希计算出一个表中的位置,因为n是数组的长度,所以(n1)hash肯定不会出现数组越界elseif((ftabAt(tab,i(n1)hash))null){如果这个位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的if(casTabAt(tab,i,null,创建一个Node添加到数组中去,null表示的是下一个节点为空newNodeK,V(hash,key,value,null)))break;nolockwhenaddingtoemptybin}如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失elseif((fhf。hash)MOVED)tabhelpTransfer(tab,f);else{如果这个位置有元素的话,就采用synchronized的方式加锁如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历如果找到了key和key的hash值都一样的节点,则把它的值替换如果没找到的话,则添加在链表的最后面如果是树的话,会对该节点上的关联数目进行判断如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者扩容VoldValnull;再次取出要存储的位置的元素,跟前面取出来的作比较synchronized(f){if(tabAt(tab,i)f){取出来的元素的hash值大于0if(fh0){binCount1;遍历这个链表for(NodeK,Vef;;binCount){Kek;如果hash,key相同,则替换valueif(e。hashhash((eke。key)key(ek!nullkey。equals(ek)))){oldVale。val;当使用putIfAbsent的时候,只有在这个key没有设置值的时候才设置if(!onlyIfAbsent)e。valvalue;break;}NodeK,Vprede;如果不是同样的hash,同样的Key的时候,则判断该节点的下一个节点是否为空,为空吧这个要加入的节点设置为当前节点的下一个节点if((ee。next)null){pred。nextnewNodeK,V(hash,key,value,null);break;}}}如果已经转成红黑树类型了elseif(finstanceofTreeBin){NodeK,Vp;binCount2;将元素添加到树中去if((p((TreeBinK,V)f)。putTreeVal(hash,key,value))!null){oldValp。val;if(!onlyIfAbsent)p。valvalue;}}}}if(binCount!0){当在同一个节点的数目达到8个的时候,则扩张数组或将给节点的数据转为treeif(binCountTREEIFYTHRESHOLD)treeifyBin(tab,i);if(oldVal!null)returnoldVal;break;}}}计数addCount(1L,binCount);returnnull;}扩容源码 节点个数超过8,调用treeifyBin扩容。 节点数组长度小于64,扩张数组长度一倍,否则的话把链表转为树Replacesalllinkednodesinbinatgivenindexunlesstableistoosmall,inwhichcaseresizesinstead。当数组长度小于64的时候,扩张数组长度一倍,否则的话把链表转为树privatefinalvoidtreeifyBin(NodeK,V〔〕tab,intindex){NodeK,Vb;intn,sc;if(tab!null){if((ntab。length)MINTREEIFYCAPACITY)MINTREEIFYCAPACITY64tryPresize(n1);数组扩容elseif((btabAt(tab,index))!nullb。hash0){synchronized(b){使用synchronized同步器,将该节点出的链表转为树if(tabAt(tab,index)b){TreeNodeK,Vhdnull,tlnull;hd:树的头(head)for(NodeK,Veb;e!null;ee。next){TreeNodeK,VpnewTreeNodeK,V(e。hash,e。key,e。val,null,null);if((p。prevtl)null)把Node组成的链表,转化为TreeNode的链表,头结点任然放在相同的位置hdp;设置headelsetl。nextp;tlp;}setTabAt(tab,index,newTreeBinK,V(hd));把TreeNode的链表放入容器TreeBin中}}}}}读源码 读支持并发操作相比put方法,get就很单纯了,支持并发操作,当key为null的时候回抛出NullPointerException的异常get操作通过首先计算key的hash值来确定该元素放在数组的哪个位置然后遍历该位置的所有节点如果不存在的话返回nullpublicVget(Objectkey){NodeK,V〔〕tab;NodeK,Ve,p;intn,eh;Kek;inthspread(key。hashCode());if((tabtable)!null(ntab。length)0(etabAt(tab,(n1)h))!null){if((ehe。hash)h){if((eke。key)key(ek!nullkey。equals(ek)))returne。val;}elseif(eh0)return(pe。find(h,key))!null?p。val:null;while((ee。next)!null){if(e。hashh((eke。key)key(ek!nullkey。equals(ek))))returne。val;}}returnnull;}1。7及以下版本原理 ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构,一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁分段锁结构 staticfinalclassSegmentK,VextendsReentrantLockimplementsSerializable{Thenumberofelementsinthissegmentsregion。transientvolatileintcount;Numberofupdatesthatalterthesizeofthetable。Thisisusedduringbulkreadmethodstomakesuretheyseeaconsistentsnapshot:IfmodCountschangeduringatraversalofsegmentscomputingsizeorcheckingcontainsValue,thenwemighthaveaninconsistentviewofstateso(usually)mustretry。transientintmodCount;Thetableisrehashedwhenitssizeexceedsthisthreshold。(Thevalueofthisfieldisalwaystt(int)(capacityloadFactor)tt。)transientintthreshold;Thepersegmenttable。transientvolatileHashEntryK,V〔〕table;Theloadfactorforthehashtable。Eventhoughthisvalueissameforallsegments,itisreplicatedtoavoidneedinglinkstoouterobject。serialfinalfloatloadFactor;Segment(floatlf,intthreshold,HashEntryK,V〔〕tab){this。loadFactorlf;this。thresholdthreshold;this。tabletab;}}插入源码 先加锁,再操作publicVput(Kkey,Vvalue){SegmentK,Vs;if(valuenull)thrownewNullPointerException();inthashhash(key。hashCode());intj(hashsegmentShift)segmentMask;if((s(SegmentK,V)UNSAFE。getObjectnonvolatile;recheck(segments,(jSSHIFT)SBASE))null)inensureSegmentsensureSegment(j);returns。put(key,hash,value,false);}finalVput(Kkey,inthash,Vvalue,booleanonlyIfAbsent){HashEntryK,VnodetryLock()?null:scanAndLockForPut(key,hash,value);VoldValue;try{HashEntryK,V〔〕tabtable;intindex(tab。length1)hash;HashEntryK,VfirstentryAt(tab,index);for(HashEntryK,Vefirst;;){if(e!null){Kk;if((ke。key)key(e。hashhashkey。equals(k))){oldValuee。value;if(!onlyIfAbsent){e。valuevalue;modCount;}break;}ee。next;}else{if(node!null)node。setNext(first);elsenodenewHashEntryK,V(hash,key,value,first);intccount1;if(cthresholdtab。lengthMAXIMUMCAPACITY)rehash(node);elsesetEntryAt(tab,index,node);modCount;countc;oldValuenull;break;}}}finally{unlock();}returnoldValue;}读取源码Returnsthevaluetowhichthespecifiedkeyismapped,or{codenull}ifthismapcontainsnomappingforthekey。pMoreformally,ifthismapcontainsamappingfromakey{codek}toavalue{codev}suchthat{codekey。equals(k)},thenthismethodreturns{codev};otherwiseitreturns{codenull}。(Therecanbeatmostonesuchmapping。)throwsNullPointerExceptionifthespecifiedkeyisnullpublicVget(Objectkey){SegmentK,Vs;manuallyintegrateaccessmethodstoreduceoverheadHashEntryK,V〔〕tab;inthhash(key。hashCode());longu(((hsegmentShift)segmentMask)SSHIFT)SBASE;原子操作if((s(SegmentK,V)UNSAFE。getObjectVolatile(segments,u))!null(tabs。table)!null){for(HashEntryK,Ve(HashEntryK,V)UNSAFE。getObjectVolatile(tab,((long)(((tab。length1)h))TSHIFT)TBASE);e!null;ee。next){Kk;if((ke。key)key(e。hashhkey。equals(k)))returne。value;}}returnnull;}删除操作 先上锁,再操作finalVremove(Objectkey,inthash,Objectvalue){if(!tryLock())scanAndLock(key,hash);VoldValuenull;try{HashEntryK,V〔〕tabtable;intindex(tab。length1)hash;HashEntryK,VeentryAt(tab,index);HashEntryK,Vprednull;while(e!null){Kk;HashEntryK,Vnexte。next;if((ke。key)key(e。hashhashkey。equals(k))){Vve。value;if(valuenullvaluevvalue。equals(v)){if(prednull)setEntryAt(tab,index,next);elsepred。setNext(next);modCount;count;oldValuev;}break;}prede;enext;}}finally{unlock();}returnoldValue;}扩容 如何扩容。扩容的时候首先会创建一个两倍于原容量的数组,然后将原数组里的元素进行再hash后插入到新的数组里。为了高效ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。privatevoidrehash(HashEntryK,Vnode){Reclassifynodesineachlisttonewtable。Becauseweareusingpoweroftwoexpansion,theelementsfromeachbinmusteitherstayatsameindex,ormovewithapoweroftwooffset。Weeliminateunnecessarynodecreationbycatchingcaseswhereoldnodescanbereusedbecausetheirnextfieldswontchange。Statistically,atthedefaultthreshold,onlyaboutonesixthofthemneedcloningwhenatabledoubles。Thenodestheyreplacewillbegarbagecollectableassoonastheyarenolongerreferencedbyanyreaderthreadthatmaybeinthemidstofconcurrentlytraversingtable。Entryaccessesuseplainarrayindexingbecausetheyarefollowedbyvolatiletablewrite。HashEntryK,V〔〕oldTabletable;intoldCapacityoldTable。length;intnewCapacityoldCapacity1;threshold(int)(newCapacityloadFactor);HashEntryK,V〔〕newTable(HashEntryK,V〔〕)newHashEntry〔newCapacity〕;intsizeMasknewCapacity1;for(inti0;ioldCapacity;i){HashEntryK,VeoldTable〔i〕;if(e!null){HashEntryK,Vnexte。next;intidxe。hashsizeMask;if(nextnull)SinglenodeonlistnewTable〔idx〕e;else{ReuseconsecutivesequenceatsameslotHashEntryK,VlastRune;intlastIdxidx;for(HashEntryK,Vlastnext;last!null;lastlast。next){intklast。hashsizeMask;if(k!lastIdx){lastIdxk;lastRunlast;}}newTable〔lastIdx〕lastRun;Cloneremainingnodesfor(HashEntryK,Vpe;p!lastRun;pp。next){Vvp。value;inthp。hash;intkhsizeMask;HashEntryK,VnnewTable〔k〕;newTable〔k〕newHashEntryK,V(h,p。key,v,n);}}}}intnodeIndexnode。hashsizeMask;addthenewnodenode。setNext(newTable〔nodeIndex〕);newTable〔nodeIndex〕node;tablenewTable;}结论 Q:1。8版本如何保证线程安全?写操作:同步处理主要是通过Synchronized和unsafe两种方式来完成的在取得某个位置的Node的时候,使用的都是unsafe的方法,来达到并发安全的目的当需要在某个位置设置节点的时候,则会通过Synchronized的同步机制来锁定该位置的节点。在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED当把某个位置的节点复制到扩张后的table的时候,也通过Synchronized的同步机制来保证线程安全读操作:根本没有使用同步机制,也没有使用unsafe方法,所以读操作是支持并发操作的 Q:读加锁么,写加锁么?在1。7版本及以前版本写使用分段锁(可重入锁)保证安全读使用UNSAFE。getObjectVolatile,保证数据一致性在1。8版本写时,取位置使用unsafe,设置节点用Synchronized,保证数据一致性读操作是直接支持并发操作 Q:在jdk1。8与jdk1。7的差别?1。7使用分段锁保证线程安全,1。8使用Synchronized和unsafe两种方式保证线程安全1。7扩容时,分段锁局部扩容,hash算好后,重新分配,1。8整个扩容,然后数据重新放入1。8引入红黑树,在链表太长时,转换,提高查询效率 Q:为什么key不能为null,value不能为null 看源码put入口判断if(keynullvaluenull)thrownewNullPointerException(); 设计意图:二义性,防止歧义 如果判断value为null,不确定这个Null是插入进去的,还是本身真的没值,导致歧义,为了线程安全性,所以都不能为null