定义
sync.Pool是一个可以存或取的临时对象池。对外提供New、Get、Put等API,利用mutex支持多线程并发。
目标
sync.Pool解决以下问题:
- 增加临时对象的用复用率,减少GC负担
- 通过对象的复用,减少内存申请开销,有利于提高一部分性能
实现
这一部分回答如何实现的问题。
关于了解实现,最好的办法就是看代码。
描述
1
2
3
4
5
6
7
8
9
10
11
|
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
|
各个成员含义如下:
noCopy: 防止sync.Pool被复制
local: poolLocal数组的指针
localSize: poolLocal数组大小
New: 函数指针申请具体的对象,便于用户定制各种类型的对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// Local per-P Pool appendix.
type poolLocalInternal struct {
private interface{} // Can be used only by the respective P.
shared []interface{} // Can be used by any P.
Mutex // Protects shared.
}
type poolLocal struct {
poolLocalInternal
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
|
private:private私有池,只能被对应P使用(说明:P是指goroutine执行所占用的处理器,下同)
shared: shared共享池,能被任何P使用
Mutex: 保护shared共享池
pad:poolLocal结构体中特别增加了pad成员,这是为了防止false sharing。
操作
操作分为四种类型:
- New
- Get
- Put
- CleanUp
New
这部分主要解决问题:如何创建一个具体对象池?
具体参考代码如下:
1
2
3
4
5
6
7
8
9
|
// Object Object
type Object struct {
a int
b int
}
var pool = sync.Pool{
New: func() interface{} { return new(Object) },
}
|
Get
Get解决了如何从具体sync.Pool中获取对象的问题。
获取对象有三个来源:
- private池
- shared池
- 系统的Heap内存
获取对象顺序是先从private池获取,如果不成功则从shared池获取,如果继续不成功,则从Heap中申请一个对象。这是不是有熟悉的味道?在两级cache的情况下,CPU获取数据,先从L1 cache开始,再是L2 cache, 是内存。
具体代码实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
l := p.pin() // 绑定private池和P
x := l.private
l.private = nil
runtime_procUnpin() // 去绑定private池和P
if x == nil { // private池获取失败
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last] // 从shared池获取最后一个对象
l.shared = l.shared[:last] // 从shared池删除最后一个对象
}
l.Unlock()
if x == nil {
x = p.getSlow() // pid对应poolLocal没有获取成功,开始遍历整个poolLocal数组
}
}
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
if x == nil && p.New != nil {
x = p.New() // 从heap申请对象
}
return x
}
func (p *Pool) getSlow() (x interface{}) {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
local := p.local // load-consume
// Try to steal one element from other procs.
pid := runtime_procPin()
runtime_procUnpin()
for i := 0; i < int(size); i++ { // 遍历poolLocal数组
l := indexLocal(local, (pid+i+1)%int(size)) // 注意pid+i+1 这样可以从pid+1位置开始整个遍历
l.Lock()
last := len(l.shared) - 1
if last >= 0 {
x = l.shared[last]
l.shared = l.shared[:last]
l.Unlock()
break
}
l.Unlock()
}
return x
}
// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P.
// Caller must call runtime_procUnpin() when done with the pool.
func (p *Pool) pin() *poolLocal {
pid := runtime_procPin()
// In pinSlow we store to localSize and then to local, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid)
}
return p.pinSlow() // 没有对应poolLocal,进入慢路径处理
}
func (p *Pool) pinSlow() *poolLocal {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
s := p.localSize
l := p.local
if uintptr(pid) < s { // 根据pid获取poolLocal
return indexLocal(l, pid)
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size) // 重新分配poolLocal
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid] // 返回新的poolLocal
}
|
总结Get主要要点如下:
- 先从本P绑定的poolLocal获取对象:先从本poolLocal的private池获取对象,再从本poolLocal的shared池获取对象
- 上一步没有成功获取对象,再从其他P的shared池获取对象
- 上一步没有成功获取对象,则从Heap申请对象
Put
Put完成将对象放回对象池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
if race.Enabled {
if fastrand()%4 == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
l := p.pin() // 绑定private池和P
if l.private == nil {
l.private = x // 放回private池中
x = nil
}
runtime_procUnpin() // 去绑定private池和P
if x != nil {
l.Lock()
l.shared = append(l.shared, x) // 放回shared池
l.Unlock()
}
if race.Enabled {
race.Enable()
}
}
|
上面的代码总结如下:
- 如果poolLocalInternal的private为空,则将回收的对象放到private池中
- 如果poolLocalInternal的private非空,则将回收的对象放到shared池中
CleanUp
CleanUp实现
注册poolCleanup函数。
1
2
3
4
|
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
|
poolCleanup函数具体实现,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// Defensively zero out everything, 2 reasons:
// 1. To prevent false retention of whole Pools.
// 2. If GC happens while a goroutine works with l.shared in Put/Get,
// it will retain whole Pool. So next cycle memory consumption would be doubled.
for i, p := range allPools {
allPools[i] = nil
for i := 0; i < int(p.localSize); i++ {
l := indexLocal(p.local, i)
l.private = nil
for j := range l.shared {
l.shared[j] = nil
}
l.shared = nil
}
p.local = nil
p.localSize = 0
}
allPools = []*Pool{}
}
|
CleanUp时机
什么时候进行CleanUp回收对象池?在gc开始前。
具体代码(代码文件为runtime/mgc.go)如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func gcStart(trigger gcTrigger) {
...
// clearpools before we start the GC. If we wait they memory will not be
// reclaimed until the next GC cycle.
clearpools() // 在这里清理sync.Pool
work.cycles++
gcController.startCycle()
work.heapGoal = memstats.next_gc
// In STW mode, disable scheduling of user Gs. This may also
// disable scheduling of this goroutine, so it may block as
// soon as we start the world again.
if mode != gcBackgroundMode {
schedEnableUser(false)
}
...
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func clearpools() {
// clear sync.Pools
if poolcleanup != nil {
poolcleanup() // 如果poolcleanup不为空,调用poolcleanup函数
}
// Clear central sudog cache.
// Leave per-P caches alone, they have strictly bounded size.
// Disconnect cached list before dropping it on the floor,
// so that a dangling ref to one entry does not pin all of them.
lock(&sched.sudoglock)
var sg, sgnext *sudog
for sg = sched.sudogcache; sg != nil; sg = sgnext {
sgnext = sg.next
sg.next = nil
}
sched.sudogcache = nil
unlock(&sched.sudoglock)
// Clear central defer pools.
// Leave per-P pools alone, they have strictly bounded size.
lock(&sched.deferlock)
for i := range sched.deferpool {
// disconnect cached list before dropping it on the floor,
// so that a dangling ref to one entry does not pin all of them.
var d, dlink *_defer
for d = sched.deferpool[i]; d != nil; d = dlink {
dlink = d.link
d.link = nil
}
sched.deferpool[i] = nil
}
unlock(&sched.deferlock)
}
|
总结
总结一下sync.Pool的实现,要点如下:
- 提供New定义实现用户自定义对象
- 需要使用对象调用Get从对象池获取临时对象,Get优先级首先是本P绑定的poolLocal, 其次是其他P绑定的poolLocal,最后是Heap内存
- 对象使用完毕调用Put将临时对象放回对象池
- 未被使用的对象会定时GC回收
- 对象没有类似于linux cache object对应的free函数
应用
sync.Pool并不是万能药。要根据具体情境而定是否使用sync.Pool。
总结不适合使用sync.Pool的情境,具体如下:
- 对象中分配的系统资源如socket,buffer
- 对象需要进行异步处理
- 对象是组合对象,如存在指针指向其他的对象
- 批量对象需要并发处理
- 复用对象大小存在的波动,如对象结构成员存在slice
在排除上面情境下,适合使用的sync.Pool应满足以下条件,具体如下:
- 对象是buffer或非组合类型如buffer reader, json decode, bufio writer
- 对象内存可以重复使用
同时在使用应该注意问题:
- Put对象之前完成初始化,避免数据污染带来问题, 这可能带来各种各样的问题
- 写代码时要满足one Get, one Put的要求
- 注意获取对象后是否存在修改对象内存存局的代码
- 关注应用场景是否容易出现Pool竞争的情况
- sync.Pool不是万能药,不要拿着锤子,看什么都是钉子
(Ps: 个人能力不足,若有错误不足,欢迎指正!)
参考
- sync: Pool example suggests incorrect usage