定义

sync.Pool是一个可以存或取的临时对象池。对外提供New、Get、Put等API,利用mutex支持多线程并发。

目标

sync.Pool解决以下问题:

  1. 增加临时对象的用复用率,减少GC负担
  2. 通过对象的复用,减少内存申请开销,有利于提高一部分性能

实现

这一部分回答如何实现的问题。

关于了解实现,最好的办法就是看代码。

描述

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type Pool struct {
    noCopy noCopy

    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}

各个成员含义如下:

noCopy: 防止sync.Pool被复制

local: poolLocal数组的指针

localSize: poolLocal数组大小

New: 函数指针申请具体的对象,便于用户定制各种类型的对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// Local per-P Pool appendix.
type poolLocalInternal struct {
    private interface{}   // Can be used only by the respective P.
    shared  []interface{} // Can be used by any P.
    Mutex                 // Protects shared.
}

type poolLocal struct {
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

private:private私有池,只能被对应P使用(说明:P是指goroutine执行所占用的处理器,下同)

shared: shared共享池,能被任何P使用

Mutex: 保护shared共享池

pad:poolLocal结构体中特别增加了pad成员,这是为了防止false sharing。

操作

操作分为四种类型:

  1. New
  2. Get
  3. Put
  4. CleanUp

New

这部分主要解决问题:如何创建一个具体对象池?

具体参考代码如下:

1
2
3
4
5
6
7
8
9
// Object Object
type Object struct {
    a int
    b int
}

var pool = sync.Pool{
    New: func() interface{} { return new(Object) },
}

Get

Get解决了如何从具体sync.Pool中获取对象的问题。

获取对象有三个来源:

  1. private池
  2. shared池
  3. 系统的Heap内存

获取对象顺序是先从private池获取,如果不成功则从shared池获取,如果继续不成功,则从Heap中申请一个对象。这是不是有熟悉的味道?在两级cache的情况下,CPU获取数据,先从L1 cache开始,再是L2 cache, 是内存。

具体代码实现如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    }
    l := p.pin() // 绑定private池和P
    x := l.private
    l.private = nil
    runtime_procUnpin() // 去绑定private池和P
    if x == nil { //  private池获取失败
        l.Lock()
        last := len(l.shared) - 1
        if last >= 0 {
            x = l.shared[last] // 从shared池获取最后一个对象 
            l.shared = l.shared[:last] // 从shared池删除最后一个对象
        }
        l.Unlock()
        if x == nil { 
            x = p.getSlow() // pid对应poolLocal没有获取成功,开始遍历整个poolLocal数组
        }
    }
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    if x == nil && p.New != nil {
        x = p.New() // 从heap申请对象
    }
    return x
}

func (p *Pool) getSlow() (x interface{}) {
    // See the comment in pin regarding ordering of the loads.
    size := atomic.LoadUintptr(&p.localSize) // load-acquire
    local := p.local                         // load-consume
    // Try to steal one element from other procs.
    pid := runtime_procPin()
    runtime_procUnpin()
    for i := 0; i < int(size); i++ { // 遍历poolLocal数组
        l := indexLocal(local, (pid+i+1)%int(size)) // 注意pid+i+1 这样可以从pid+1位置开始整个遍历
        l.Lock()
        last := len(l.shared) - 1
        if last >= 0 {
            x = l.shared[last]
            l.shared = l.shared[:last]
            l.Unlock()
            break
        }
        l.Unlock()
    }
    return x
}

// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() *poolLocal {
    pid := runtime_procPin()
    // In pinSlow we store to localSize and then to local, here we load in opposite order.
    // Since we've disabled preemption, GC cannot happen in between.
    // Thus here we must observe local at least as large localSize.
    // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
    s := atomic.LoadUintptr(&p.localSize) // load-acquire
    l := p.local                          // load-consume
    if uintptr(pid) < s {
        return indexLocal(l, pid)
    }
    return p.pinSlow() // 没有对应poolLocal,进入慢路径处理
}

func (p *Pool) pinSlow() *poolLocal {
    // Retry under the mutex.
    // Can not lock the mutex while pinned.
    runtime_procUnpin()
    allPoolsMu.Lock()
    defer allPoolsMu.Unlock()
    pid := runtime_procPin()
    // poolCleanup won't be called while we are pinned.
    s := p.localSize
    l := p.local
    if uintptr(pid) < s { // 根据pid获取poolLocal
        return indexLocal(l, pid) 
    }
    if p.local == nil {
        allPools = append(allPools, p)
    }
    // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
    size := runtime.GOMAXPROCS(0)
    local := make([]poolLocal, size) // 重新分配poolLocal
    atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
    atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
    return &local[pid] // 返回新的poolLocal
}

总结Get主要要点如下:

  1. 先从本P绑定的poolLocal获取对象:先从本poolLocal的private池获取对象,再从本poolLocal的shared池获取对象
  2. 上一步没有成功获取对象,再从其他P的shared池获取对象
  3. 上一步没有成功获取对象,则从Heap申请对象

Put

Put完成将对象放回对象池。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
    if x == nil {
        return
    }
    if race.Enabled {
        if fastrand()%4 == 0 {
            // Randomly drop x on floor.
            return
        }
        race.ReleaseMerge(poolRaceAddr(x))
        race.Disable()
    }
    l := p.pin() // 绑定private池和P
    if l.private == nil {
        l.private = x   // 放回private池中
        x = nil
    }
    runtime_procUnpin() // 去绑定private池和P
    if x != nil {
        l.Lock()
        l.shared = append(l.shared, x)  // 放回shared池
        l.Unlock()
    }
    if race.Enabled {
        race.Enable()
    }
}

上面的代码总结如下:

  1. 如果poolLocalInternal的private为空,则将回收的对象放到private池中
  2. 如果poolLocalInternal的private非空,则将回收的对象放到shared池中

CleanUp

CleanUp实现

注册poolCleanup函数。

1
2
3
4
func init() {
   runtime_registerPoolCleanup(poolCleanup)
}

poolCleanup函数具体实现,

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func poolCleanup() {
    // This function is called with the world stopped, at the beginning of a garbage collection.
    // It must not allocate and probably should not call any runtime functions.
    // Defensively zero out everything, 2 reasons:
    // 1. To prevent false retention of whole Pools.
    // 2. If GC happens while a goroutine works with l.shared in Put/Get,
    //    it will retain whole Pool. So next cycle memory consumption would be doubled.
    for i, p := range allPools {
        allPools[i] = nil
        for i := 0; i < int(p.localSize); i++ {
            l := indexLocal(p.local, i)
            l.private = nil
            for j := range l.shared {
                l.shared[j] = nil
            }
            l.shared = nil
        }
        p.local = nil
        p.localSize = 0
    }
    allPools = []*Pool{}
}

CleanUp时机

什么时候进行CleanUp回收对象池?在gc开始前。

具体代码(代码文件为runtime/mgc.go)如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func gcStart(trigger gcTrigger) {
    ... 
    // clearpools before we start the GC. If we wait they memory will not be
    // reclaimed until the next GC cycle.
    clearpools() // 在这里清理sync.Pool

    work.cycles++

    gcController.startCycle()
    work.heapGoal = memstats.next_gc

    // In STW mode, disable scheduling of user Gs. This may also
    // disable scheduling of this goroutine, so it may block as
    // soon as we start the world again.
    if mode != gcBackgroundMode {
        schedEnableUser(false)
    }
    ...
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func clearpools() {
    // clear sync.Pools
    if poolcleanup != nil {
        poolcleanup() // 如果poolcleanup不为空,调用poolcleanup函数
    }

    // Clear central sudog cache.
    // Leave per-P caches alone, they have strictly bounded size.
    // Disconnect cached list before dropping it on the floor,
    // so that a dangling ref to one entry does not pin all of them.
    lock(&sched.sudoglock)
    var sg, sgnext *sudog
    for sg = sched.sudogcache; sg != nil; sg = sgnext {
        sgnext = sg.next
        sg.next = nil
    }
    sched.sudogcache = nil
    unlock(&sched.sudoglock)

    // Clear central defer pools.
    // Leave per-P pools alone, they have strictly bounded size.
    lock(&sched.deferlock)
    for i := range sched.deferpool {
        // disconnect cached list before dropping it on the floor,
        // so that a dangling ref to one entry does not pin all of them.
        var d, dlink *_defer
        for d = sched.deferpool[i]; d != nil; d = dlink {
            dlink = d.link
            d.link = nil
        }
        sched.deferpool[i] = nil
    }
    unlock(&sched.deferlock)
}

总结

总结一下sync.Pool的实现,要点如下:

  1. 提供New定义实现用户自定义对象
  2. 需要使用对象调用Get从对象池获取临时对象,Get优先级首先是本P绑定的poolLocal, 其次是其他P绑定的poolLocal,最后是Heap内存
  3. 对象使用完毕调用Put将临时对象放回对象池
  4. 未被使用的对象会定时GC回收
  5. 对象没有类似于linux cache object对应的free函数

应用

sync.Pool并不是万能药。要根据具体情境而定是否使用sync.Pool。

总结不适合使用sync.Pool的情境,具体如下:

  1. 对象中分配的系统资源如socket,buffer
  2. 对象需要进行异步处理
  3. 对象是组合对象,如存在指针指向其他的对象
  4. 批量对象需要并发处理
  5. 复用对象大小存在的波动,如对象结构成员存在slice

在排除上面情境下,适合使用的sync.Pool应满足以下条件,具体如下:

  1. 对象是buffer或非组合类型如buffer reader, json decode, bufio writer
  2. 对象内存可以重复使用

同时在使用应该注意问题:

  1. Put对象之前完成初始化,避免数据污染带来问题, 这可能带来各种各样的问题
  2. 写代码时要满足one Get, one Put的要求
  3. 注意获取对象后是否存在修改对象内存存局的代码
  4. 关注应用场景是否容易出现Pool竞争的情况
  5. sync.Pool不是万能药,不要拿着锤子,看什么都是钉子

(Ps: 个人能力不足,若有错误不足,欢迎指正!)

参考

  1. sync: Pool example suggests incorrect usage