首页 科普 资讯 养生 问答 找医院 相关问答
首页> 问答

并发编程内幕-LongAdder实现原理和应用

发布网友 发布时间:2024-10-22 06:29

我来回答

1个回答

热心网友 时间:2024-10-23 03:30

LongAdder和AtomicLong类似,但是在多线程更新的情况下LongAdder具有更高的性能。 LongAdder更适合统计类的场景,例如监控统计、计数统计等。 例如我们想实现一个单词计数器,可以通过如下代码实现:

//定义一个ConcurrentHashMap存放单词和计数的映射,key为单词,value是LongAdderprivateMap<String,LongAdder>wordCounterMap=newConcurrentHashMap<>();//这个是单词次数统计操作publicvoidaddCount(Stringword){//首先读一次,在word重复较多的场景下能够减少锁冲突,因为computeIfAbsent方法内部有加锁LongAddercounter=wordCounterMap.get(word);if(counter==null){//如果当前map还没有word映射,则通过computeIfAbsent原子性创建映射counter=wordCounterMap.computeIfAbsent(word,newFunction<String,LongAdder>(){@OverridepublicLongAdderapply(Strings){returnnewLongAdder();}});}//调用对应的counter.add方法原子性加1counter.add(1L);}

LongAdder类继承自Striped64类,LongAdder类中并没有字段,状态都保存在Striped64中。 LongAdder封装了add和sum两个方法,add方法负责修改计数,sum读取计数。

Striped64采取了分片的思想提高并发度,其中保存了一个long类型的base字段和一个Cell数组,每个Cell中也有一个value字段。 当没有冲突的时候,会通过cas base字段来更新,当出现冲突的时候,会更新cell数组,当cell数组更新冲突时,会进行cell数组扩容来减少冲突。

LongAdder的add更新逻辑为

publicclassLongAdderextendsStriped64{publicvoidadd(longx){Cell[]cs;longb,v;intm;Cellc;//首先判断cells是否为空,如果不为空说明已经出现了多线程casbase冲突,则进入到if语句中//如果为空,则会尝试casbase值,如果更新成功,返回,否则进入到if语句中if((cs=cells)!=null||!casBase(b=base,b+x)){booleanuncontended=true;//如果cells是空的或者cas更新当前线程对应的cell值冲突,则调用longAccumulate方法,longAccumulate方法负责cells初始化、扩容、尝试减少冲突等逻辑if(cs==null||(m=cs.length-1)<0||(c=cs[getProbe()&m])==null||!(uncontended=c.cas(v=c.value,v+x)))longAccumulate(x,null,uncontended);}}//sum方法的逻辑比较简单,把base和所有cell的value加起来就是sum总和。publiclongsum(){Cell[]cs=cells;longsum=base;if(cs!=null){for(Cellc:cs)if(c!=null)sum+=c.value;}returnsum;}}

再看下Striped64类,先定义了一个Cell类,并且用@Contended注解标注来避免伪共享问题。Cell中有一个value字段表示当前这个cell的值。所有的cell和base加起来就是最终的值。Cell类也提供了cas方法,内部是通过VarHandle类实现的,这个类似之前版本的Unsafe的功能。 然后Stripe64类中定义了cells数组、base变量和cellsBusy自旋锁字段。 cell数组用来存储各个分片内的值。base变量会在没有线程竞争的时候使用,减少内存占用。 cellsBusy起到一个简单的自旋锁,当线程通过cas从0改成1时说明这个线程获取到了锁。

abstractclassStriped64extendsNumber{@jdk.internal.vm.annotation.ContendedstaticfinalclassCell{volatilelongvalue;Cell(longx){value=x;}finalbooleancas(longcmp,longval){returnVALUE.compareAndSet(this,cmp,val);}finalvoidreset(){VALUE.setVolatile(this,0L);}finalvoidreset(longidentity){VALUE.setVolatile(this,identity);}finallonggetAndSet(longval){return(long)VALUE.getAndSet(this,val);}//VarHandlemechanicsprivatestaticfinalVarHandleVALUE;static{try{MethodHandles.Lookupl=MethodHandles.lookup();VALUE=l.findVarHandle(Cell.class,"value",long.class);}catch(ReflectiveOperationExceptione){thrownewExceptionInInitializerError(e);}}}staticfinalintNCPU=Runtime.getRuntime().availableProcessors();/***Tableofcells.Whennon-null,sizeisapowerof2.*/transientvolatileCell[]cells;/***Basevalue,usedmainlywhenthereisnocontention,butalsoas*afallbackduringtableinitializationraces.UpdatedviaCAS.*/transientvolatilelongbase;/***Spinlock(lockedviaCAS)usedwhenresizingand/orcreatingCells.*/transientvolatileintcellsBusy;

Striped64类的核心方法是longAccumulate,它会尝试找到当前线程对应的cell,并且在需要的时候完成cells数组初始化、cell对象初始化、扩容、调整probe值解决冲突等逻辑,流程图如下

finalvoidlongAccumulate(longx,LongBinaryOperatorfn,booleanwasUncontended){inth;//通过当前Thread的probe值判断如果没有初始化,就调用ThreadLocalRandom.current()方法触发初始化,初始化后会设置probe值。if((h=getProbe())==0){//intp=probeGenerator.addAndGet(PROBE_INCREMENT);//intprobe=(p==0)?1:p;//skip0//U.putInt(t,PROBE,probe);ThreadLocalRandom.current();//forceinitializationh=getProbe();wasUncontended=true;}booleancollide=false;//Trueiflastslotnonemptydone:for(;;){Cell[]cs;Cellc;intn;longv;//当前cells不为空的时候,会优先使用cells进行更新//TOOD什么情况下cells!=null但是cells.length==0呢?if((cs=cells)!=null&&(n=cs.length)>0){//通过probe与n-1与操作找到当前线程对应的cells数组的下标,如果为空,则需要加锁进行初始化Cell对象if((c=cs[(n-1)&h])==null){//乐观锁判断if(cellsBusy==0){//TrytoattachnewCell//乐观创建Cell对象,为什么在这里创建而不是在下面获取到锁之后创建呢?主要是为了减少加锁中的操作开销减小锁粒度Cellr=newCell(x);//Optimisticallycreate//获取自旋锁if(cellsBusy==0&&casCellsBusy()){try{//Recheckunderlock//拿到锁之后,需要再判断下,因为可能有其他线程在casCellsBusy之前已经执行过下面的代码了。Cell[]rs;intm,j;//这里为什么需要再判断cells不为null且length不大于0呢?难道从外层的if进来之后,cells可能重新被置为null?if((rs=cells)!=null&&(m=rs.length)>0&&//再check下对应的cell有没有创建了rs[j=(m-1)&h]==null){//没有创建则使用我们创建的初始值为x的cell赋值rs[j]=r;//退出循环breakdone;}}finally{//释放锁cellsBusy=0;}continue;//Slotisnownon-empty}}collide=false;}//cells刚创建的时候,wasUncontended为true则会走到下面的cas//LongAdder中调用add如果addcellcas失败,wasUncontended会是false,则不会再尝试cas,直接走到修改probe再重试的逻辑elseif(!wasUncontended)//CASalreadyknowntofailwasUncontended=true;//Continueafterrehash//进行一次cas当前cell的尝试elseif(c.cas(v=c.value,(fn==null)?v+x:fn.applyAsLong(v,x)))break;//cas失败并且当前数组数量已经大于等于cpu数量了或者发生了扩容,则elseif(n>=NCPU||cells!=cs)//cell数组数量大于等于cpu数量后不再扩容,或者已经被其他线程扩容过了,都重置collide变量标记位当前没有冲突,这样就不会走到下面的扩容逻辑中了collide=false;//Atmaxsizeorstaleelseif(!collide)//collide为false的时候,就会进行修改probe并重试并修改collide为true,这样下次就会走到下面的elseif也就是扩容逻辑。collide=true;//当和其他线程出现碰撞的时候,会进行扩容,这里也加上自旋锁elseif(cellsBusy==0&&casCellsBusy()){try{//doublecheck,防止其他线程已经进行了修改if(cells==cs)//Expandtableunlessstale//扩容cells数组为当前双倍长度,和HashMap不同的是这里的扩容不需要转移数据,因为Striped64对外表示的是总和。cells=Arrays.copyOf(cs,n<<1);}finally{//释放锁cellsBusy=0;}collide=false;continue;//Retrywithexpandedtable}//走到这里说明之前出现了锁竞争,通过类似随机数的调整修改当前线程的probe值,来尝试减少冲突h=advanceProbe(h);}//cells为空的时候,需要进行cells数组初始化,cellsBusy变量起到了自旋锁的作用,成功cascellsBusy从0到1的线程,会负责初始化cells数组elseif(cellsBusy==0&&cells==cs&&casCellsBusy()){try{//Initializetable//再check下,这是防止判断cellsBusy==0&&cells==cs后还没执行casCellsBusy时,有其他线程先完成了初始化//如果是这种情况,cells和cs不是相同的值,则会在下次循环在上面的if语句中处理if(cells==cs){//数组默认大小为2Cell[]rs=newCell[2];//等同于用prob取余获取当前线程对应的cell,创建Cell对象rs[h&1]=newCell(x);//把新创建的数组赋值给Striped64的cells字段cells=rs;breakdone;}}finally{//释放锁cellsBusy=0;}}//最后的降级,会尝试使用basecas更新,如果cas成功,返回,否则继续循环重试//发生的时机//Fallbackonusingbaseelseif(casBase(v=base,(fn==null)?v+x:fn.applyAsLong(v,x)))breakdone;}}}

AtomicLong和LongAdder的选择

大部分情况下AtomicLong都能满足需求,通过AtomicLong我们能够实现原子更新(incrementAndGet, compareAndSet等方法),保证线程安全等

当写多读少,且写可能出现较多竞争时,可以考虑使用LongAdder,适用的场景例如有请求次数统计这样的监控场景

对内存占用比较敏感时,更适合用AtomicLong

作者: bytejava

更多参考资料

Optimistic lock, pessimistic lock, CAS lock, spin lock, do you know?

2个5相加,列乘法算式为 [ ] A.3+5B.3×5C.5× 快手哪里查看通讯录好友 查看通讯录好友位置介绍 大学生平均每月生活费 职工福利发什么 协商解除的补偿方案可以低于法定标准吗? 一刻相册“碰一碰”功能如何使用?一刻相册“碰一碰”功能使用教程_百度... 声屏障的种类都有哪些? UG怎么按尺寸放大缩小啊 在哪里调整UG软件中的缩放功能? 房地产预售后多久交房 房地产预售房交房时间 一般开盘多久后交房 商品房交房要多久 新房一般多久可以交房 房子从卖到交房要多久 买房为什么交房都要几年 房地产一般提前多久交房 兴业趋势基金怎么样? 玉树藏族民俗禁忌 男人硬不了的原因是什么 三步教会你免费白嫖 iPad 笔记软件 Notability,永久解锁教程来了! 手把手教你提交Jar包到Maven公共仓库 Starter是什么意思-搜狗输入法 startai怎么安装到psstarter安装教程 狗狗不吃还干呕是为什么 狗狗为什么干呕不吃 女孩子以祺字取最佳的名字 每天晚上睡觉排气是胃癌前兆吗 EXCEL排序的数字只认第一个数字怎么办? 金属回收哪方面能有些折扣? 重庆发改委地址怎么没有了呢 重庆市发展和改革委员会主要职责 区级十五五规划 微信在哪删除使用过的小程序 小程序记录删除方法介绍 毡毯的意思是什么 毡毯是指什么 腰滑脱了怎么办? 电饭煲怎样用才能省时间 怎样使用自动电饭煲? 电饭煲的使用方法有什么呢? 使用电饭煲有哪些步骤呢? 沈阳市铁友绗缝机械设备厂(二)保温被绗缝机 蚕丝被大概多少钱一斤 杭州蚕丝被品牌 杭州蚕丝被厂家 杭州有哪些蚕丝被品牌【品牌库】_百... 浙江蚕丝被品牌 浙江蚕丝被厂家 浙江有哪些蚕丝被品牌【品牌库】_百... 什么情况下不适合使用创可贴? 棉絮哪个牌子好 ...品牌排行榜,蚕丝被十大名牌推荐_中国10大蚕丝被什么品牌的好_蚕丝... 微信公众号怎么开通微店 微信公众号开微店教程 龙虾蒸芋头的做法 5岁儿童有多少颗牙齿 长桌变圆桌方法有哪些 大理石圆桌怎么打磨 茶几如何配沙发 -3的3分之1次方除以(6的3分之4次方)2分之3次方 用幂的形式表示 请问兰州西站几个出站口?哪个出口距离拉面馆近,而且面馆多?! 老同事三十年后相遇在兰州什么地方聊聊吃饭比较合适? 欧陆文明3基本信息 欧陆传奇游戏中如何才能增加建筑队列 欧陆传奇是战争策略类型的还是什么类型的 欧陆传奇新手FAQ 欧陆传奇游戏简介 客厅的沙发、茶几怎么搭配,看完这篇就懂了 为什么内存总线也叫数据总线? 孩子一个月不去幼儿园需要交管理费吗? 一整个月不上学幼儿园需缴费吗 ...如果我们以协商好下回少汇我们100元 那还需要挂账吗 发票少开下次补可以吗 某公司购买1000元的办公用品,如果不开发票只需付900元,而如果开发票 医生说我的牙齿要装个牙套,价格是1200元,但我不需要开发票的话可以少... 钻石公主号新增39例肺炎确诊 停泊在哪里 ...中python3使用multiprocessing.Pool时出现的问题解决方案 黄山山脚下有哪些景点 ...在银行贷款了,我可以像法院起诉并申请财产保全吗? 对方欠钱不还,房产都在银行抵押贷款,再无财产保全,起诉后能申请财产保... 有个朋友,他把还在银行按揭的房子抵押给我,现在他还不了钱,我可以向... 对喜欢的人能有啥起床气 夏天不能没有「冬阴功」的味道 国外承包工程报关 龙起洪荒章节目录 刚买了个ipod shuffle 插上电脑还有ipod 的标志,过一会就没有了,充电... 奥林巴斯E-PM2单头套机(17mm)操作功能 舞蹈学专业就业方向是什么 sometimes可以用于否定句吗,为什么? 91年属羊婚姻特征特点 合页合页的安装要点 哦可爱的蓝精灵英文歌词 作文 哥哥的超能力600字 魔兽世界关于4.3血DK的PK输出手法问题 4.3PVP DZ DK LR 哪个犀利点? 咔叽探险队火焰女神技能怎么样_火焰女神技能介绍
声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com