发布网友 发布时间:2024-10-22 06:29
共1个回答
热心网友 时间:2024-10-23 03:30
LongAdder和AtomicLong类似,但是在多线程更新的情况下LongAdder具有更高的性能。 LongAdder更适合统计类的场景,例如监控统计、计数统计等。 例如我们想实现一个单词计数器,可以通过如下代码实现:
//定义一个ConcurrentHashMap存放单词和计数的映射,key为单词,value是LongAdderprivateMap<String,LongAdder>wordCounterMap=newConcurrentHashMap<>();//这个是单词次数统计操作publicvoidaddCount(Stringword){//首先读一次,在word重复较多的场景下能够减少锁冲突,因为computeIfAbsent方法内部有加锁LongAddercounter=wordCounterMap.get(word);if(counter==null){//如果当前map还没有word映射,则通过computeIfAbsent原子性创建映射counter=wordCounterMap.computeIfAbsent(word,newFunction<String,LongAdder>(){@OverridepublicLongAdderapply(Strings){returnnewLongAdder();}});}//调用对应的counter.add方法原子性加1counter.add(1L);}LongAdder类继承自Striped64类,LongAdder类中并没有字段,状态都保存在Striped64中。 LongAdder封装了add和sum两个方法,add方法负责修改计数,sum读取计数。
Striped64采取了分片的思想提高并发度,其中保存了一个long类型的base字段和一个Cell数组,每个Cell中也有一个value字段。 当没有冲突的时候,会通过cas base字段来更新,当出现冲突的时候,会更新cell数组,当cell数组更新冲突时,会进行cell数组扩容来减少冲突。
LongAdder的add更新逻辑为
publicclassLongAdderextendsStriped64{publicvoidadd(longx){Cell[]cs;longb,v;intm;Cellc;//首先判断cells是否为空,如果不为空说明已经出现了多线程casbase冲突,则进入到if语句中//如果为空,则会尝试casbase值,如果更新成功,返回,否则进入到if语句中if((cs=cells)!=null||!casBase(b=base,b+x)){booleanuncontended=true;//如果cells是空的或者cas更新当前线程对应的cell值冲突,则调用longAccumulate方法,longAccumulate方法负责cells初始化、扩容、尝试减少冲突等逻辑if(cs==null||(m=cs.length-1)<0||(c=cs[getProbe()&m])==null||!(uncontended=c.cas(v=c.value,v+x)))longAccumulate(x,null,uncontended);}}//sum方法的逻辑比较简单,把base和所有cell的value加起来就是sum总和。publiclongsum(){Cell[]cs=cells;longsum=base;if(cs!=null){for(Cellc:cs)if(c!=null)sum+=c.value;}returnsum;}}再看下Striped64类,先定义了一个Cell类,并且用@Contended注解标注来避免伪共享问题。Cell中有一个value字段表示当前这个cell的值。所有的cell和base加起来就是最终的值。Cell类也提供了cas方法,内部是通过VarHandle类实现的,这个类似之前版本的Unsafe的功能。 然后Stripe64类中定义了cells数组、base变量和cellsBusy自旋锁字段。 cell数组用来存储各个分片内的值。base变量会在没有线程竞争的时候使用,减少内存占用。 cellsBusy起到一个简单的自旋锁,当线程通过cas从0改成1时说明这个线程获取到了锁。
abstractclassStriped64extendsNumber{@jdk.internal.vm.annotation.ContendedstaticfinalclassCell{volatilelongvalue;Cell(longx){value=x;}finalbooleancas(longcmp,longval){returnVALUE.compareAndSet(this,cmp,val);}finalvoidreset(){VALUE.setVolatile(this,0L);}finalvoidreset(longidentity){VALUE.setVolatile(this,identity);}finallonggetAndSet(longval){return(long)VALUE.getAndSet(this,val);}//VarHandlemechanicsprivatestaticfinalVarHandleVALUE;static{try{MethodHandles.Lookupl=MethodHandles.lookup();VALUE=l.findVarHandle(Cell.class,"value",long.class);}catch(ReflectiveOperationExceptione){thrownewExceptionInInitializerError(e);}}}staticfinalintNCPU=Runtime.getRuntime().availableProcessors();/***Tableofcells.Whennon-null,sizeisapowerof2.*/transientvolatileCell[]cells;/***Basevalue,usedmainlywhenthereisnocontention,butalsoas*afallbackduringtableinitializationraces.UpdatedviaCAS.*/transientvolatilelongbase;/***Spinlock(lockedviaCAS)usedwhenresizingand/orcreatingCells.*/transientvolatileintcellsBusy;Striped64类的核心方法是longAccumulate,它会尝试找到当前线程对应的cell,并且在需要的时候完成cells数组初始化、cell对象初始化、扩容、调整probe值解决冲突等逻辑,流程图如下
finalvoidlongAccumulate(longx,LongBinaryOperatorfn,booleanwasUncontended){inth;//通过当前Thread的probe值判断如果没有初始化,就调用ThreadLocalRandom.current()方法触发初始化,初始化后会设置probe值。if((h=getProbe())==0){//intp=probeGenerator.addAndGet(PROBE_INCREMENT);//intprobe=(p==0)?1:p;//skip0//U.putInt(t,PROBE,probe);ThreadLocalRandom.current();//forceinitializationh=getProbe();wasUncontended=true;}booleancollide=false;//Trueiflastslotnonemptydone:for(;;){Cell[]cs;Cellc;intn;longv;//当前cells不为空的时候,会优先使用cells进行更新//TOOD什么情况下cells!=null但是cells.length==0呢?if((cs=cells)!=null&&(n=cs.length)>0){//通过probe与n-1与操作找到当前线程对应的cells数组的下标,如果为空,则需要加锁进行初始化Cell对象if((c=cs[(n-1)&h])==null){//乐观锁判断if(cellsBusy==0){//TrytoattachnewCell//乐观创建Cell对象,为什么在这里创建而不是在下面获取到锁之后创建呢?主要是为了减少加锁中的操作开销减小锁粒度Cellr=newCell(x);//Optimisticallycreate//获取自旋锁if(cellsBusy==0&&casCellsBusy()){try{//Recheckunderlock//拿到锁之后,需要再判断下,因为可能有其他线程在casCellsBusy之前已经执行过下面的代码了。Cell[]rs;intm,j;//这里为什么需要再判断cells不为null且length不大于0呢?难道从外层的if进来之后,cells可能重新被置为null?if((rs=cells)!=null&&(m=rs.length)>0&&//再check下对应的cell有没有创建了rs[j=(m-1)&h]==null){//没有创建则使用我们创建的初始值为x的cell赋值rs[j]=r;//退出循环breakdone;}}finally{//释放锁cellsBusy=0;}continue;//Slotisnownon-empty}}collide=false;}//cells刚创建的时候,wasUncontended为true则会走到下面的cas//LongAdder中调用add如果addcellcas失败,wasUncontended会是false,则不会再尝试cas,直接走到修改probe再重试的逻辑elseif(!wasUncontended)//CASalreadyknowntofailwasUncontended=true;//Continueafterrehash//进行一次cas当前cell的尝试elseif(c.cas(v=c.value,(fn==null)?v+x:fn.applyAsLong(v,x)))break;//cas失败并且当前数组数量已经大于等于cpu数量了或者发生了扩容,则elseif(n>=NCPU||cells!=cs)//cell数组数量大于等于cpu数量后不再扩容,或者已经被其他线程扩容过了,都重置collide变量标记位当前没有冲突,这样就不会走到下面的扩容逻辑中了collide=false;//Atmaxsizeorstaleelseif(!collide)//collide为false的时候,就会进行修改probe并重试并修改collide为true,这样下次就会走到下面的elseif也就是扩容逻辑。collide=true;//当和其他线程出现碰撞的时候,会进行扩容,这里也加上自旋锁elseif(cellsBusy==0&&casCellsBusy()){try{//doublecheck,防止其他线程已经进行了修改if(cells==cs)//Expandtableunlessstale//扩容cells数组为当前双倍长度,和HashMap不同的是这里的扩容不需要转移数据,因为Striped64对外表示的是总和。cells=Arrays.copyOf(cs,n<<1);}finally{//释放锁cellsBusy=0;}collide=false;continue;//Retrywithexpandedtable}//走到这里说明之前出现了锁竞争,通过类似随机数的调整修改当前线程的probe值,来尝试减少冲突h=advanceProbe(h);}//cells为空的时候,需要进行cells数组初始化,cellsBusy变量起到了自旋锁的作用,成功cascellsBusy从0到1的线程,会负责初始化cells数组elseif(cellsBusy==0&&cells==cs&&casCellsBusy()){try{//Initializetable//再check下,这是防止判断cellsBusy==0&&cells==cs后还没执行casCellsBusy时,有其他线程先完成了初始化//如果是这种情况,cells和cs不是相同的值,则会在下次循环在上面的if语句中处理if(cells==cs){//数组默认大小为2Cell[]rs=newCell[2];//等同于用prob取余获取当前线程对应的cell,创建Cell对象rs[h&1]=newCell(x);//把新创建的数组赋值给Striped64的cells字段cells=rs;breakdone;}}finally{//释放锁cellsBusy=0;}}//最后的降级,会尝试使用basecas更新,如果cas成功,返回,否则继续循环重试//发生的时机//Fallbackonusingbaseelseif(casBase(v=base,(fn==null)?v+x:fn.applyAsLong(v,x)))breakdone;}}}AtomicLong和LongAdder的选择
大部分情况下AtomicLong都能满足需求,通过AtomicLong我们能够实现原子更新(incrementAndGet, compareAndSet等方法),保证线程安全等
当写多读少,且写可能出现较多竞争时,可以考虑使用LongAdder,适用的场景例如有请求次数统计这样的监控场景
对内存占用比较敏感时,更适合用AtomicLong
作者: bytejava
更多参考资料Optimistic lock, pessimistic lock, CAS lock, spin lock, do you know?