Pool即池子,我们常听到与池子相关的优化程序性能的技术。比如数据库连接池,TCP连接池,线程池等。本文介绍的是sync.Pool,它是Go标准库中提供的一个通用的Pool数据结构,可以使用它创建池化的对象。sync.Pool数据类型的对象用来保存一组可独立访问的临时对象,注意它是临时的,也就是说sync.Pool中保存的对象会在将来的某个时间从sync.Pool中移除掉,如果也没有被其他对象引用的话,该对象会被垃圾回收掉。
「可以提升程序性能」:使用sync.Pool可以提升程序性能,Go是一个自带GC的语言,使用起来没有心智负担,我们想使用一个对象就创建一个对象,想使用多个就创建多个,不用关心对象的释放问题。因为有GC程序会帮助我们自动将不再使用的对象的内存回收掉。但是在使用Go开发高性能的应用程序的时候,需要考虑GC执行垃圾回收给我们带来的影响。在GC执行垃圾回收的过程中需要STW(stop-the-world)时间,如果创建有大量的对象,GC程序在检查这些对象是不是垃圾的时候需要花费很多时间。进而对我们的业务功能造成性能影响。
说明一点,上面说的对象默认指的是堆上的对象。在Go语言中对象按分配位置分为两种,一种是分配在栈上,另一种分配在堆上。分配在栈上的对象不需要GC回收,因为随着函数的调用和返回,栈上分配的对象也不在存在了。GC程序处理的是分配在堆上的对象,因为分配在堆上的对象是长期存在的,如果不进行释放,会在程序的整个生命周期内都存在。所以需要把堆上不再使用的对象尽快释放,减少内存占用。
频繁的创建对象,GC每次需要检查这些对象是否可以回收,然后释放对象。sync.Pool采用对象池的形式,把不用的对象回收起来,而不是直接GC掉,下次再使用的时候不用重新创建对象,使用对象池已有的对象。这样做有两方面收益:
sync.Pool数据类型比较简单,对外提供的方法只有3个:New、Get和Put
sync.Pool是一个结构体,它有一个可导出的字段New,该字段的类型是func() interface{}函数。当使用Get方法从对象池中获取一个元素的时候,如果池子中没有存储的元素,会调用New方法创建一个新的对象。New字段不是必须的,如果没有设置New字段池子中也没有存储的元素时,调用Get方法会返回nil值。New方法可以理解成创建一个对象的构造函数,用于创建对象。
调用Get方法,会从Pool中取走一个元素,这个元素会从Pool中移除,返回给调用者。如果Pool中没有元素了,Pool.New字段设置有值,会调用Pool.New方法创建一个对象返回。如果也没有设置Pool.New,将会返回一个nil.

Put方法将一个元素返回给Pool, Pool会把这个元素保存起来,Get的时候可以复用。如果Put一个nil值,Pool会忽略这个值。

下面举一个实例,熟悉对sync.Pool的使用
package main
import (
"fmt"
"sync"
)
var pool *sync.Pool
type Person struct {
Name string
Age int
}
func init() {
pool = &sync.Pool{
New: func() interface{} {
fmt.Println("creating a new person")
return new(Person)
},
}
}
func main() {
person := pool.Get().(*Person)
fmt.Printf("get an object from pool:%v\n", person)
person.Name = "my"
person.Age = 8
pool.Put(person)
person = pool.Get().(*Person)
fmt.Printf("get an object from pool:%v\n", person)
person = pool.Get().(*Person)
fmt.Printf("get an object from pool:%v\n", person)
}
输出结果为:
creating a new person
get an object from pool:&{ 0}
get an object from pool:&{my 8}
creating a new person
get an object from pool:&{ 0}
从输出结果可以看到,程序3次从Pool中获取元素,实际上执行了2次New函数调用。第一次从Pool中获取元素,由于Pool中还没有元素,通过New方法产生一个元素,然后修改person的Name和Age值,放回到Pool中,第二次在从Pool中获取元素,拿到到前面放回的去的元素,所以没有调用New新创建一个,通过打印输出,也可以看到获取到的元素是前面放回去的。第三次从Pool中获取元素的时候,已经没有元素了,将会调用New方法新产生一个,输出有creating a new person可以证实这个。
下面通过源码分析Pool是如何实现的,「需要说明的是下面的源码是Go的1.14.2版本」sync.Pool的数据结构比较复杂,这里统一将sync.Pool中涉及到的数据结构放在一起,方便理解。整个sync.Pool结构图如下图所示,可以结合这张图和下面的源码及解析一起看。

sync.Pool结构图
sync.Pool数据结构:local和victim是sync.Pool最重要的2个字段,local是一个poolLocal数组的指针,localSize代表这个数组的大小,它的大小是goroutine中P的数量。每个P都有一个ID,这个ID的值对应着poolLoca数组的下标索引。所以poolLocal数组大小最大为runtime.GOMAXPROCS(0)。victim对应local,它也是一个poolLocal数组的指针,victimSize是victim数组的大小。每次垃圾回收的时候,Pool会把victim中的对象清理掉,然后把local的数据赋值给victim,并把local设置为nil,victim像一个中转站,里面的数据可以被捡回来重新使用,也可能被GC回收掉。
type Pool struct {
noCopy noCopy
// local是一个指向[p]poolLocal的指针,它指向的是一个数组,此数组中的元素是poolLocal类型
// 数组的大小是固定的,数量与p的数量是相同的,也是每个p对应数组中的1个元素
local unsafe.Pointer
// local数组的大小
localSize uintptr
// victim类型与local是一样的,可以理解为local的中转站,过渡存储local用的
victim unsafe.Pointer
// victim数组的大小
victimSize uintptr
// New方法,返回的空接口类型,该字段也是Pool唯一暴露给外界的字段
// New方法可以赋值一个能够产生值的函数,在调用Get方法的时候可以用
// New方法来产生一个value,如果没有给New赋值,调用Get时将返回nil.
New func() interface{}
}
poolLocal结构:poolLocal是sync.Pool中local或victim指向数组中的元素类型,里面有一个pad数组填充,使得poolLocal的大小构成128字节的整数倍,是为了防止在cache line上分配多个poolLocalInternal从而造成伪共享。
type poolLocal struct {
poolLocalInternal
// pad是填充用的,防止伪共享,让整个poolLocal构成128字节的整数倍
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
poolLocalInternal结构:它包含private和shared两个字段,private是一个私有对象,只能被拥有它的P使用,而一个P同时只能执行一个G, 所以G访问private不需要加锁,不存在并发冲突。shared是一个双端队列,它既能被与之绑定的P访问和修改,也能被其他的P访问和修改,绑定的P可以从队头加入和出队元素,其他的P只能从队尾出队元素。
// poolLocal的内部结构,Pool中的poolLocal是对poolLocalInternal的简单包装
type poolLocalInternal struct {
// private是一个私有对象,从名字可以看出来,它只能被拥有它的P使用
// Pool中也说明了,每个P有1个poolLocal,也即有一个poolLocalInternal
// 这里的private只能被他属于的P使用
private interface{} // Can be used only by the respective P.
// shared是一个双端队列,它既能被与之绑定的P访问和修改,也能被其他的P访问和修改
// 绑定的P可以从对头入队加和出队元素,其他的P只能从队尾出队元素。
shared poolChain
}
poolChain结构:poolChain是一个双向链表的结构体头,它保存着双向链表的头节点和尾节点指针,链表中节点元素类型是poolChainElt。这里需要注意一下,按照我们常规数据结构的理解,head指针指向的位置在链表最左边,tail指针指向的位置在最右边,但是这里刚好是相反的,这里的head指针指向链表的最右边,tail指向最左边。每添加一个元素的时候,添加在head节点的下一个节点,然后将head指向刚添加的节点。
// poolChain是一个双向链表的结构体头,它保存着双向链表的头节点和尾节点指针,poolChainElt
// 是链表中每个节点的类型结构,该结构类型中有2个字段,headTail uint64和 vals []eface,
// headTail作用后面有描述,vals就是存储元素的切片。vals的大小在每个poolChainElt是不同的,
// 相邻的2个直接大小存在着2倍的关系。
type poolChain struct {
// head只会被1个生产者调用,即Put操作,只在头部添加元素,不需要强同步
head *poolChainElt
// tail被消费者调用,从尾部获取元素,可能有多个消费者同时获取,所以操作必须是原子的
tail *poolChainElt
}
poolChainElt结构:双向链表中的节点结构,它有一个存储数据的poolDequeue队列,next和prev分别是指向前后节点的指针。poolDequeue中vals字段存放元素,vals是一个固定大小的切片,相邻的两个poolChainElt中的vals大小存在2倍关系,第一个poolChainElt中vals大小是8,可以结合上面pool结构图看,第二个poolChainElt中的size是16,最大值为2^30。
type poolChainElt struct {
// 存数据的队列,是一个固定大小的切片
poolDequeue
// next和prev是双向链表的指针,next被生产者写,消费者读,它的值只会从
// nil变成非nil, prev被消费者写,生产者读,它的值只会从非nil变成nil
next, prev *poolChainElt
}
poolDequeue结构:poolDequeue是一个无锁的、固定大小的,单个生产者、多个消费者的队列。poolDequeue无锁实现是Pool的精髓,巧妙采用CAS去掉了1.13版本之前的有锁实现。它有headTail和vals两个字段,headTail占8个字节,高4字节表示的是head在vals中的下标位置,低4字节表示的是tail在vals中的下标位置。生产者每生产1个数据,head+1,消费者每取走队列中1个数据,tail+1.当tail与head相等的时候,说明队列中没有元素了,当tail+len(vals)=head的时候,说明队列满了。
// poolDequeue是一个无锁的、固定大小的、单个生产者、多个消费者的队列
// 生产者只能从队头向队列添加数据或者从队头取走数据,单个或多个消费者可以
// 可用队尾取走数据
type poolDequeue struct {
// headTail占8个字节,分为两部分,高位4字节和低位4字节,高4字节表示的
// 32位int数据描述的是head下标,低4字节表示的是32位int数据描述的是tail
// 下标,这里的下标说的都是下面vals数组的下标,vals[tail,head)下标范围
// 内是有数据的,可以被消费者消费。生产者每生产1个数据,head会+1, 消费者
// 每取走队列中1个数据,tail+1, 当tail赶上head的时候,说明队列中没有数
// 据了,头部索引存储在最高有效位中,因此我们可以原子地添加它,溢出并不会产生
// 别的影响。
headTail uint64
// vals是一个存储数据的环形队列,数据可以是任何类型,因为定义的类型是interface{}
// 它的大小必须是2整数次幂。vals[i].typ为nil,表示该下标位置的没有放数据,typ不是
// nil表示该位置已放有数据。在tail还未设置为i+1和vals[i].typ为非nil前,vals中的
// 位置i视为还在占用状态。当前消费者取走数据或是生产者读取走数据后,vals[i]将自动被
// 它们设置为nil.
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
Get先从本地local的private中获取元素,此private只能被当前的P访问,一个P同时只能运行一个G, 所以直接访问l.privated不需要加锁。如果local.private中没有元素了,尝试从local.shared队列的头部弹出一个元素,如果local.shared中也没有元素了,则从其他P对应的shared中偷取一个,如果其他P对应的shared中也没有元素了,再检查victim中是否有元素,如果还是没有,设置了New方法,将调用New方法产生一个。如果没有设置New方法,将返回nil. 整个Get流程如下图所示,结合流程图和源码解析一起看,更容易理解。

// Get方法从Pool中返回一个元素,并将它从Pool中移除,调用者不要假定放入(Put方法)Pool中元素和从里面
// 取(Get)出来的元素有任何关系,如果Pool中没有缓存的元素了并且P.New没有赋值将会返回nil.否则调用New
// 方法产生一个对象返回
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
// 把当前的goroutine固定在当前的P上,返回当前的P上的*poolLocal
l, pid := p.pin()
// 先从本地local的private字段获取元素,此private只能被当前的P访问
// 其他P是不能访问的,一个P同一时间只能有1个goroutine在运行,所以
// 直接访问l.private并不需要加锁
x := l.private
l.private = nil
// private中没有元素
if x == nil {
// 尝试从当前的本地local.shared中出队一个元素,x是从队头弹出的。
x, _ = l.shared.popHead()
if x == nil {
// 如果从local.shared也没有拿到,则从其他P中的l.shared中偷1个
// 如果shared中也没有,产生从victim中获取
x = p.getSlow(pid)
}
}
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 走到这里说明,private, local.shared, 其他P中的local.shared, 本地local.victim,
// 其他P中的local.victim都没有缓存了,如果设置New函数,调用New函数创建一个元素返回
if x == nil && p.New != nil {
x = p.New()
}
// 走到这里说明没有设置New函数,返回nil
return x
}
pin方法会调用runtime_procPin方法获取当前运行的G绑定到当前的P上,禁止抢占,返回关联P的pid. 然后原子操作取出p.localSize值与Pid进行比较,正常情况下pid是不会比localSize大,如果不在localSize范围内,说明程序修改过P的大小,会走到pinSlow处理逻辑。
// 将当前的G定在与它关联的P上,禁止抢占,返回关联P对应的poolLocal对象和P的id
// 调用者必须执行runtime_procUnpin操作当结束对pool操作的时候
func (p *Pool) pin() (*poolLocal, int) {
// 获取运行当前G,与之关联的P的id
pid := runtime_procPin()
// 检查localSize是不是比pid大,正常情况下p.localSize=max(pid)+1
// 因为pid是P的id,如果P的数量为n,则pid的范围为[0,n), 如果pid的值不在
// localSize范围内,说明程序修改过P的数量,这里就会走到pinSlow逻辑
// pinSlow逻辑主要是重新分配p的local和localSize
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}
pinSlow会再次检查pid是否在localSize范围内,如果不在,将会重新分配poolLocal数组给local, 初始化localSize大小,并将p加入到全局的allPools切片中,allPools维护了程序中所有的sync.Pool对象。
func (p *Pool) pinSlow() (*poolLocal, int) {
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// 已经加了allPoolsMu全局锁,这里可以直接读取localSize和local,不存在data race
s := p.localSize
l := p.local
// 再次判断pid的值是否在正常的范围内,刚才前面解除了禁止抢占,等现在再拿到pid的时候
// 可能不是之前的pid了
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 将p加入到全局pool allPools中
if p.local == nil {
allPools = append(allPools, p)
}
// GOMAXPROCS传0返回值是当前可以并发运行的CPU数
size := runtime.GOMAXPROCS(0)
// 创建一个新的poolLocal切片,并复制给当前的p
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
// 返回当前的P对应的localPool和pid的值
return &local[pid], pid
}
// l存储的是local的起始地址,local是一个poolLocal数组,所以根据起始位置+poolLocal的偏移量
// 可以获取到第i个索引中的poolLocal值
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}
继续分析Get中的popHead操作,popHead从poolChain中获取一个数据,优先从头节点head指向的环形队列中获取,如果没有获取到,再从head前一个节点指向的环形队列中获取,直到查询完所有的队列都没有,返回失败。整个处理过程是不断的从链表的head节点向尾节点遍历,对遍历到的每个节点执行popHead, 这个节点类型是poolDequeue, poolDequeue的popHead操作见下面的源码解析,有详细的注释说明,这里在具体细说。
// popHead从poolChain获取一个数据,优先从头部的环形队列中获取,
// 如果没有获取到,在从head前一个节点指向的环形队列中获取,直到
// 查询完所有的队列都没有,返回获取失败false
func (c *poolChain) popHead() (interface{}, bool) {
d := c.head
// 从head节点开始查找
for d != nil {
if val, ok := d.popHead(); ok {
return val, ok
}
// 继续查找前一个队列,只要d非空即没有查到tail位置,一直
// 查找,获取到了数据就提前返回
d = loadPoolChainElt(&d.prev)
}
// 走到这里表示没有获取数据,返回失败
return nil, false
}
// popHead从队列的头部返回一个元素,如果队列为空,第2个返回值将返回false, 注意该函数
// 只能被单个生产者调用,多个生产者即G同时调用,会引发数据竞争
func (d *poolDequeue) popHead() (interface{}, bool) {
var slot *eface
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// 下标tail和head相等了,说明tail追赶上了head,即将vals中的元素取完了
if tail == head {
// Queue is empty.
return nil, false
}
head--
// 构建一个新的uint64,head是-1后的值
ptrs2 := d.pack(head, tail)
// 进一步比较headTail和ptrs,如果相同说明没有修改它,将其设置为ptrs2
// 这时可以放心大胆的对head进行操作,获取里面的元素了,如果这段时间headTail
// 有人修改,继续进行下一轮循环处理
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// 这里可以放心大胆的读取vals中head下标处的元素了
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
// 这里判断slot中的val是不是dequeueNil型,因为在pushHead中有肯能会放入,作占位用
val := *(*interface{})(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// 将slot填充为nil值,因为popHead和pushHead不会同时被调用,所以这里可以放心大胆对其进行
// 操作,还有一点是,前面已经重新设置了head值了,popTail已经不可能访问到slot这里,也不用担心
// popTail与popHead在这里的冲突问题
*slot = eface{}
return val, true
}
如果shared中的popHead方法也没有获取到值,将会调下面的getSlow方法。先尝试从其他的P对应的poolLocal中偷一个元素,尝试的顺序是从当前pid+1个索引位置开始的,会对sync.local检查一圈。对每个poolLocal检查的是shared中的元素,看尾部有没有元素,如果有从尾部弹出一个元素。如果检查完所有的poolLocal中的shared都没有找到,接下来将在当前本地poolLocal中的victim的private查找,如果还是没有,就检查它的victim的shared是否有,最后检查其他P对应的poolLocal中victim中的shared是否有数据。
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
// 尝试从其他P中偷一个元素,尝试P的顺序是从当前pid+1个索引位置开始对应的
// poolLocal中的shared开始查找是否有元素,在shared中的查询方式是从
// 它的尾部弹出一个元素,如果shared队列中没有,继续尝试下一个位置,直到
// 循环检查一圈都没有,走victim中检查
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
// 下面尝试从受害中缓存victim中查找是否元素,查找的位置是从pid索引位置开始的poolLocal
// 产生从它的shared尾部弹出一个元素,如果有就返回,如果没有就尝试下一个位置的poolLocal
locals = p.victim
l := indexLocal(locals, pid)
// 检查victim.private是否有元素,有就返回
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 这里将p.victimSize设置为0,是为了减少后序调用getSlow,不用再一次遍历
// victim数组了,直接返回nil, 加快处理速度
atomic.StoreUintptr(&p.victimSize, 0)
// 尝试了所有的local.shared和victim.private, victim.shared都没有找到,返回nil
return nil
}
// popTail从poolChain中获取一个数据返回,第一个返回值表示返回的数据,第二个
// 返回值表示是否成功获取到了数据,如果没有获取到,将返回false
func (c *poolChain) popTail() (interface{}, bool) {
d := loadPoolChainElt(&c.tail)
// d为nil,表示c.tail还未初始化,比如第一次执行Get操作的时候就会发生,这个时候
// 还没有Put数据进去,所以获取失败
if d == nil {
return nil, false
}
for {
// 需要注意这里先要进行loadPoolChainElt操作再执行d.popTail操作,
// 这两条语句不能颠倒顺序,一般来说,d可能短暂为空,如果在执行pop
// 且执行失败了,但是在执行前d2为非空。这种情况d永久会为空了,需要将
// d从poolChain中移除掉
d2 := loadPoolChainElt(&d.next)
if val, ok := d.popTail(); ok {
return val, ok
}
if d2 == nil {
// 走完整个链表了,还是没有获取到数据,直接返回nil,false
return nil, false
}
// 当前tail执行的节点中的环形队列中已经没有数据可以获取了,尝试tail的
// next节点中获取,因为当前的tail节点永远不会再有数据,所以把它从poolChain
// 移除掉,下次再执行popTail的时候,就不用检查这个空队列了。
// 这里采用了CAS操作,因为有多个消费者可能并行执行popTail.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// 将d从poolChain中移除,以便在gc的时候可以将d回收掉,
// 那为啥在上面的popHead中没有将d从poolChain中移除掉呢?
// 因为popHead中的d可以留着,在后序的pushHead中可以继续
// 使用,而popTail中的d永远不会再被使用了,因为没有pushTail操作
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
上面的popTail对d的处理做了优化,将d从poolChain中移除,以便在gc的时候可以将d回收掉,那为啥在上面的popHead中没有将d从poolChain中移除掉呢?因为popHead中的d可以留着,在后序的pushHead中可以继续使用,而popTail中d永远不会再被使用了,因为没有pushTail操作。
Put操作处理逻辑比较简单,优先将元素放在本地的private,如果private字段已经有值了,会将元素添加到本地的shared队列中。整个处理流程如下图,可以结合下面的源码解析一起看。

// Put将对象放回到池子p中
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 将当前的G钉在与之前关联的M上,优先将x归还到到private上
// 如果private已有元素,放到本地的local.shared上,从队头
// 放回
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
// 解除G和M的绑定
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
poolChain pushhead先在head节点添加元素,如果head节点不存在,说明整个poolChain为空,将创建一个poolChainElt节点,并初始化其大小为8.并将元素添加到poolChainElt中,如果poolChaintElt满了会添加失败,这时将新建一个poolChainElt节点,然后将元素加入到新节点。
// 向poolChain中添加元素,先在head节点添加,如果head节点不存在,说明整个poolChain为空
// 就创建第一个poolChainElt节点,并初始化其大小为8,c.head和tail都指向该poolChainElt
// 并将元素val添加到poolChainElt中,在添加的时候如果poolChainElt满了会失败,这时会新建
// 1个poolChainElt,初始大小为当前c.head中元素个数的2倍
func (c *poolChain) pushHead(val interface{}) {
d := c.head
if d == nil {
// 创建第一个poolChainElt节点,节点可容纳的元素个数为8个
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 将元素添加到poolChainElt中,如果切片满了会添加失败,继续走后面的处理
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
// 创建1个新的poolChainElt节点,节点可以容纳的元素是head节点的2倍,
// 当然不能超过2^30
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
// 下面将d2加到链表的head节点的下一个节点,这里理解的时候要注意下,
// 与我们通常链表不太一样,这里的head指向的是链表的尾部,反而tail指向的
// 的是链表的头部,所以采用尾插法将新的节点加入到链表尾,更新head的值
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
pushHead是Put操作的主要处理逻辑,将元素添加到vals中,如果vals满了,则添加失败,该函数不能被多个G同时调用,否则存在data race。
// pushHead真正将元素添加到vals中,如果vals满了,则添加失败,该函数
// 不能被重入调用,就是不能被多个G同时进行操作,否则存在data race
func (d *poolDequeue) pushHead(val interface{}) bool {
// headTail是一个uint64类型,也就是8个字节,高位的4个字节表示
// 表示待添加的元素在vals中的切片下标位置,低位的4个字节表示待取走的
// 元素在vals中的切片下标位置,这里采用原子操作一次性获取headTail,然后
// 解析出head和tail的位置,可以想象,如果定义2个uint32的变量,需要用
// mutex锁保护
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
// tail是比head小的,每次put的时候,head会+1,消费取走的时候tail+1
// 所以如果tail+切片vals的长度赶上了head,说明[tail,head)位置都被
// 填充了元素,即vals满了,添加失败,直接返回
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
// 获取到vals中head下标中的元素
slot := &d.vals[head&uint32(len(d.vals)-1)]
// 进一步检查slot.type判断是否为非空,可以结合popTail看,非空说明
// 有其他的G正在清理vals中tail的位置,然视为满了处理
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
return false
}
// val为空,创建1个*dequeueNil给它
if val == nil {
val = dequeueNil(nil)
}
*(*interface{})(unsafe.Pointer(slot)) = val
// 将head的位置+1,只影响高位的4字节,对低位的4字节,即tail的位置不影响
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
前面介绍中说了,使用sync.Pool可以通过复用对象提升程序的性能,那sync.Pool中的对象是如何被清理的呢?sync.Pool中的对象并不是一直保持不释放的。否则对象太多了,会引起内存溢出。Go中的GC对sync.Pool做了专门的处理。在pool.go的init函数里,向GC注册了清理机制,源码如下.poolCleanup 会在 STW 阶段被调用。主要是将 local 和 victim 作交换,那么不至于GC 把所有的 Pool 都清空了,而是需要两个 GC 周期才会被释放.

// 注册pool清理回调,在gc的时候,会调用poolCleanup
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
// 清理pool函数,对oldPools和allPools的操作都是没有加锁的,因为这里执行的时候
// 已经STW了,不存在竞争
func poolCleanup() {
// oldPools是一个全局变量,是一个保存*sync.Pool的切片,这些
// 切片元素都是等待被gc清理的,当执行完下面的循环之后,就被清理了
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// allPools是当前新产生的*sync.Pool元素集合,里面存储的元素跟前面的
// oldPools是不同的,当该轮gc执行的时候,会将allPools中所有的Pool中的
// local元素移动到victim受害者缓存中,并将local缓存清空
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 交换oldPools和allPools,处理的很精炼,为什么要这么做呢?
// 进程起来,运行过程中申请了sync.Pool创建的变量vp,然后执行gc,此时oldPools是空的
// allPools是非空的,里面装的是vp, 当该函数被调用的时候,将vp中的元素从local移动到victim
// 即在第一次gc的时候,vp中的元素没有被垃圾回收,接下来执行下面这个语句之后,allPools被清空
// oldPools里面保存的是vp, 当下次执行gc的时候,oldPools中的内容被清理掉,所以vp中元素生命
// 周期在两次gc之间都是存活的
oldPools, allPools = allPools, nil
}
sync.Pool一般是性能优化的时候会使用,在高性能场景应用的非常多。下面列举2个在标准库中使用的例子。
标准库fmt包中的文件fmt/print.go中ppFree就是一个sync.Pool对象。由于fmt使用的很频繁,采用sync.Pool对已有的对象进行复用,提升性能。
var ppFree = sync.Pool{
New: func() interface{} { return new(pp) },
}
// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
p := ppFree.Get().(*pp)
p.panicking = false
p.erroring = false
p.wrapErrs = false
p.fmt.init(&p.buf)
return p
}
在标准库net包中net/http/server.go文件中,就使用到了sync.Pool。下面提供的2K和4K的两个writer池子,采用了sync.Pool。
var (
bufioReaderPool sync.Pool
bufioWriter2kPool sync.Pool
bufioWriter4kPool sync.Pool
)
func bufioWriterPool(size int) *sync.Pool {
switch size {
case 2 << 10:
return &bufioWriter2kPool
case 4 << 10:
return &bufioWriter4kPool
}
return nil
}
func newBufioWriterSize(w io.Writer, size int) *bufio.Writer {
pool := bufioWriterPool(size)
if pool != nil {
if v := pool.Get(); v != nil {
bw := v.(*bufio.Writer)
bw.Reset(w)
return bw
}
}
return bufio.NewWriterSize(w, size)
}
func putBufioWriter(bw *bufio.Writer) {
bw.Reset(nil)
if pool := bufioWriterPool(bw.Available()); pool != nil {
pool.Put(bw)
}
}
golang的对象池sync.pool源码解读[1]Go 并发编程 — sync.Pool 源码级原理剖析 [2] 终结篇[2]深度解密Go语言之sync.pool[3]Go sync.Pool 浅析[4]
[1]
golang的对象池sync.pool源码解读: https://zhuanlan.zhihu.com/p/99710992
[2]
Go 并发编程 — sync.Pool 源码级原理剖析 [2] 终结篇: https://mp.weixin.qq.com/s/u0HZYgPVec9CET5d4wTPbA
[3]
深度解密Go语言之sync.pool: https://blog.csdn.net/qcrao/article/details/105630345
[4]
Go sync.Pool 浅析: https://segmentfault.com/a/1190000040017989