之前的文章httprouter路由框架为什么高性能提到过一点高性能的原因就是它减少了内存分配。因为分配内存是在堆上分配的,调用mallocgc函数,是有性能消耗的。 而httprouter中使用sync。pool来减少内存分配,减少GC消耗。 那么sync。Pool为什么能做到这一点?做出来哪些减少性能消耗的工作?一、初识sync。Pool sync。Pool是一组可以单独保存和检索的临时对象。存储在Pool中的任意对象可能会自动删除,这个删除过程是不会通知到用户的。 Pool的目的是缓存已分配但没有使用的对象以供之后复用,减轻垃圾回收的压力。而Pool是并发安全的,也就是说,它可以轻松构建高效、线程安全的存储池。 一个例子就是在fmt包中,Pool维护了一个动态大小的临时输出缓冲区存储,存储在负载的时候扩展(多goroutine打印),在静止时缩小。二、使用方法bufferpool:sync。Pool{New:func()interface{}{println(Createnewinstance)returnstruct{}{}},}buffer:bufferpool。Get()bufferpool。Put(buffer) 首先初始化Pool,声明一个创建Pool元素的方法。 然后当使用时申请对象,Get方法会返回Pool已经存在的对象,如果没有,就走New方法来初始化一个对象。 当使用完成后,调用Put方法把对象放回Pool中。 Pool就3个接口,针对所有的对象类型都可以使用。 那么我们来思考一个问题,使用sync。Pool有什么好处? 下面我们来看一个例子varcountint32funcnewBuffer()interface{}{atomic。AddInt32(count,1)buffer:make(〔〕byte,1024)returnbuffer}funcmain(){bufferPool:sync。Pool{New:newBuffer,}workers:10241024varwgsync。WaitGroupfori:0;iworkers;i{wg。Add(1)gofunc(){deferwg。Done()buffer:bufferPool。Get()buffer。(〔〕byte)deferbufferPool。Put(buffer)}()}wg。Wait()fmt。Printf(dbuffercreated,count)} 最终打印结果11buffercreated10buffercreated 多次运行可能会出现不同的结果,但是次数count都不大。如果不是使用Pool来申请,而是直接使用buffer:make(〔〕byte,1024)来申请内存,那么就会申请10241024个对象,造成极大的浪费。而Pool就是对这类对象的复用。三、原理 既然已经知道了复用对象的好处,那么sync。Pool到底是如何实现这一功能的呢? 首先我们来看看Pool的结构typePoolstruct{noCopynoCopylocalunsafe。PointerlocalSizeuintptrvictimunsafe。PointervictimSizeuintptrNewfunc()interface{}}nocopy就是标识不能拷贝,具体原理在读写锁时提到localSize就是cpu处理器的个数,local指向了〔localSize〕poolLocalvictim和victimSize就是在GC后接管local和localSizeNew函数就是初始化对象的方法 poolLocal管理Pool池里的cache元素的关键结构typepoolLocalInternalstruct{privateinterface{}sharedpoolChain}typepoolLocalstruct{poolLocalInternalpad〔128unsafe。Sizeof(poolLocalInternal{})128〕byte}private就是声明的对象shared是双链表结构,用于挂载cache元素pad就是用来填充字符到128字节,用于内存对齐1、获取对象1。1、获取本地pooll,pidp。pin() 这个pin函数hold住了当前goroutine在P上,不允许调度,并且返回P的本地pool和P的id。func(pPool)pin()(poolLocal,int){pid:runtimeprocPin()s:atomic。LoadUintptr(p。localSize)l:p。localifuintptr(pid)s{returnindexLocal(l,pid),pid}returnp。pinSlow()} 这个indexLocal其实就是个索引到本地pool的指针funcindexLocal(lunsafe。Pointer,iint)poolLocal{lp:unsafe。Pointer(uintptr(l)uintptr(i)unsafe。Sizeof(poolLocal{}))return(poolLocal)(lp)} 通常只有第一次执行的时候,p。localSize为0,才会执行p。pinSlow,其他都直接走if返回本地pool了。 pinSlow就是把Pool注册进allPools数组中func(pPool)pinSlow()(poolLocal,int){runtimeprocUnpin()allPoolsMu。Lock()deferallPoolsMu。Unlock()pid:runtimeprocPin()s:p。localSizel:p。localifuintptr(pid)s{returnindexLocal(l,pid),pid}ifp。localnil{allPoolsappend(allPools,p)}size:runtime。GOMAXPROCS(0)local:make(〔〕poolLocal,size)atomic。StorePointer(p。local,unsafe。Pointer(local〔0〕))atomic。StoreUintptr(p。localSize,uintptr(size))returnlocal〔pid〕,pid} 首先先解锁,然后加全局互斥锁var(allPoolsMuMutexallPools〔〕PoololdPools〔〕Pool) allPools就是全局的pool,oldPool就是victim使用的pool。 然后再重新hold住goroutine在P上,二次判断是否直接返回本地pool。 而使用runtime。GOMAXPROCS(0)来获取cpu的数量,也就是P的数量。 这里深入扩展一下,runtimeprocPin其实是runtime包下的procPin的一层封装。funcprocPin()int{g:getg()mp:g。mmp。locksreturnint(mp。p。ptr()。id)} procPin的目的就是为了当前G被抢占了执行权限,也就是说G在M上不走了,而实际核心是mp。locks,在newstack函数里,有这么段代码ifpreempt{if!canPreemptM(thisg。m){gp。stackguard0gp。stack。loStackGuardgogo(gp。sched)}}funccanPreemptM(mpm)bool{returnmp。locks0mp。mallocing0mp。preemptoffmp。p。ptr()。statusPrunning} 这里mp。locks0,所以就只能让G一直执行 而runtimeprocUnpin函数可以猜想的到,就是让mp。lock。1。2、取出对象x:l。privatel。privatenilifxnil{x,l。shared。popHead()ifxnil{xp。getSlow(pid)}} 从private中取出对象,如果取出的对象为nil,那么就尝试从share队列中获取,如果还是nil,就从其他P的队列中取,或者从victim中取。func(pPool)getSlow(pidint)interface{}{size:atomic。LoadUintptr(p。localSize)locals:p。local从其他P中取fori:0;iint(size);i{l:indexLocal(locals,(pidi1)int(size))ifx,:l。shared。popTail();x!nil{returnx}}从victim中取sizeatomic。LoadUintptr(p。victimSize)ifuintptr(pid)size{returnnil}localsp。victiml:indexLocal(locals,pid)ifx:l。private;x!nil{l。privatenilreturnx}fori:0;iint(size);i{l:indexLocal(locals,(pidi)int(size))ifx,:l。shared。popTail();x!nil{returnx}}atomic。StoreUintptr(p。victimSize,0)returnnil}1。3、初始化对象 如果上面几个地方都不存在该对象,那么就调用New函数初始化一个对象返回ifxnilp。New!nil{xp。New()}returnx2、放回对象2。1、空值判断ifxnil{return}2。2、获取本地pooll,:p。pin()2。3、尝试存放数据ifl。privatenil{l。privatexxnil}ifx!nil{放到双向链表l。shared。pushHead(x)}3、victim 看到这里,可能就有点疑惑了,Pool就这两个方法,也没有用到victim。 这个奥秘就在于它注册了init函数,在每次GC的时候调用poolCleanup函数,也就是说每一轮GC都会对所有的Pool做清理工作。funcinit(){runtimeregisterPoolCleanup(poolCleanup)} 而poolCleanup函数就是做pool的迁移funcpoolCleanup(){for,p:rangeoldPools{p。victimnilp。victimSize0}for,p:rangeallPools{p。victimp。localp。victimSizep。localSizep。localnilp。localSize0}oldPools,allPoolsallPools,nil}四、总结 1、sync。Pool是并发安全的,读取数据时会hold住当前的goroutine不被打断 2、sync。Pool不允许复制后使用,因为nocopy 3、sync。Pool不适用于socket长连接或连接池等,因为无法知道连接池的个数,连接池的元素随时可能会被释放。 4、从sync。Pool取出的对象使用完需要放回池中。