当前位置: 首页>移动开发>正文

你真的了解go中的pool吗?

为什么使用 pool

虽然 Go 是一门并发编程语言,但是在实际开发中,我们还是需要考虑并发安全问题。Go 是一个自动垃圾回收的语言,使用 Go 语言创建对象时,不需要手动释放内存。但是在并发编程中,我们需要考虑对象的复用问题。如果我们在并发编程中频繁的创建对象,那么就会频繁的触发垃圾回收,这样会影响程序的性能。
而且,Go 的垃圾回收机制还有一个问题,STW(stop-the-world,程序暂停),就是在垃圾回收的时候,会阻塞所有的 goroutine。这样会影响程序的性能。
并且,还有像数据库连接、TCP 连接等资源,这些资源的创建时一个比较耗时的操作,我们需要考虑资源的复用问题。
所以在并发编程中,我们需要考虑对象的复用问题。对象的复用可以减少垃圾回收的频率,提高程序的性能。

sync.Pool

sync.Pool 数据类型是一个对象池,它可以存储临时数据,减少内存分配,降低 GC 压力。sync.Pool 是一个非线程安全的对象池,它可以存储临时数据,减少内存分配,降低 GC 压力。
比如 fmt 包,它的实现中使用了 sync.Pool 对象池,它会使用 buffer池做输出缓冲,当大量的 goroutine 同时调用 fmt 包的输出函数时,sync.Pool 对象池可以减少内存分配,降低 GC 压力。
但是,需要注意的是,sync.Pool 对象池是一个非线程安全的对象池,它只能用于存储临时数据,不能用于存储共享数据。且不可在使用之后再复制使用。

sync.Pool 的使用方法

它只对外暴露了三个方法:New、Get 和 Put。

New

Pool struct 中有一个 New 方法New func() any,它的类型是一个函数类型,它的作用是当 Pool 中没有可用的对象时,会调用 New 方法创建一个新的对象。
需要注意的是,New 是可变的字段,这使得我们可以在运行时动态的修改 New 方法。但我们一般很少这样修改。

Get

该方法的作用是从 Pool 中获取一个对象。如果 Pool 中有可用的对象,那么就会返回一个对象。如果 Pool 中没有可用的对象,那么就会调用 New 方法创建一个新的对象。如果 New 方法返回的对象是 nil,那么 Get 方法会返回 nil。

Put

该方法的作用是将一个对象放回 Pool 中。如果 Pool 中的对象数量大于 Pool 中的最大对象数量,那么 Put 方法会丢弃这个对象,反之会将这个对象放回 Pool 中。但如果 Put 方法放回的对象是 nil,那么 Put 方法会丢弃这个对象。

源码解读

// Pool是一组临时对象,可以单独保存和检索。

// 存储在Pool中的任何项目可能在任何时候自动移除,无需通知。如果此时Pool是唯一的引用,该项目可能会被释放。

// Pool可安全供多个goroutine同时使用。

// Pool的目的是为了缓存已分配但未使用的项目以供以后重用,从而减轻GC压力。也就是说,它便于构建高效、线程安全的自由列表。但它不适用于所有的自由列表。

// Pool的适当用法是管理一组临时项目,这些项目在模块的并发独立客户端之间悄悄地共享并可能被重复利用。Pool提供了一种在许多客户端之间分摊分配开销的方法。

// Pool的良好用法示例是fmt包中维护的一个动态大小的临时输出缓冲区存储。在负荷下,该存储会增加(当许多goroutine正在打印时);在静止时会缩减。

// 另一方面,作为短暂对象的自由列表并不适合用作Pool的使用,因为在这种场景中开销不会很好地分摊。这样的对象实现它们自己的自由列表更有效。

// 在第一次使用之后,Pool不得被复制。

// 根据Go内存模型的术语,对Put(x)的调用“在返回相同值x的Get之前同步”。类似地,对New返回x的调用“在返回相同值x的Get之前同步”。
type Pool struct {
    noCopy noCopy

    local     unsafe.Pointer // 每个池本地的固定大小,实际类型为[P]poolLocal
    localSize uintptr        // 本地数组大小

    victim     unsafe.Pointer // 上一个周期的本地数据
    victimSize uintptr        // victim 数组的大小

    // New可以选择指定一个函数来生成一个值, 当Get返回nil的时候。
    // 在调用Get的同时不能并发地更改它。
    New func() any
}

pool 中核心的两个字段为 local 和 victim,它们都是 unsafe.Pointer 类型。local 是一个指向 poolLocal 的指针,poolLocal 是一个固定大小的数组,它的大小为 P,P 是 CPU 的核心数。victim 是一个指向上一个周期的 poolLocal 的指针。

func init() {
    runtime_registerPoolCleanup(poolCleanup)
}

可以看到,sync.Pool 在初始化的时候会调用 runtime_registerPoolCleanup 函数,它的作用是注册一个 cleanup 函数,这个 cleanup 函数会在 GC 的时候被调用。
函数 poolCleanup 的作用是清理 Pool 中的 victim,然后把 local 的数据给 victim。

func poolCleanup() {
    // 此函数在全局停止时被调用,在进行垃圾回收的时候。它不应当进行内存分配,可能也不应该调用任何运行时函数。
    // 由于全局停止,没有池用户可以处于被固定的状态(实际上所有的 Ps 都被固定了)。
    // 清除所有池中的 victim。
    for _, p := range oldPools {
        p.victim = nil
        p.victimSize = 0
    }

    // 移动 local 给 victim。
    for _, p := range allPools {
        p.victim = p.local
        p.victimSize = p.localSize
        p.local = nil
        p.localSize = 0
    }

    // 具有非空主缓存的池现在具有非空的 victim 缓存,而没有池具有主缓存。
    oldPools, allPools = allPools, nil
}

local 字段是一个指向 poolLocal 的指针,poolLocal 是一个固定大小的数组,它的大小为 P,P 是 CPU 的核心数。

type poolLocal struct {
    poolLocalInternal
    // 在广泛使用的平台上避免伪共享,其中 128 mod (缓存行大小) = 0。
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

其中包含 poolLocalInternal,它的定义如下:

// 本地 per-P Pool 附录。
type poolLocalInternal struct {
    private any       // 只能被当前的 P 读取
    shared  poolChain // 本地 P 可以 pushHead/popHead; 其它任意 P 可以 popTail.
}

poolLocalInternal 中包含两个字段,private 和 shared:

  • private 是一个私有的字段,只能被对应的 P 使用。
  • shared 是一个 poolChain,它是一个链表,它的作用是当 local 中的对象被用完了,可以从 shared 中获取对象。

Get 方法

// Get 可以从池中选择任意项目,将其从池中移除,并将其返回给调用者。
// Get 可以选择忽略池并将其视为空的情况。
// 调用者不应该假设传递给Put的值与Get返回值之间有任何关系。
// 如果 Get 返回 nil 并且 New 字段非 nil,则获取返回调用p.New的结果。
func (p *Pool) Get() any {
    if race.Enabled {
        race.Disable()
    }
    l, pid := p.pin() // 获取当前的 P 和 P 的 id
    x := l.private // 有先从 local 的 private 中获取对象
    l.private = nil
    if x == nil {
        // 尝试 pop 本地 shard 的头部。我们更喜欢头部而不是尾部,因为它可以重用时间局部性。
        x, _ = l.shared.popHead()
        if x == nil { // 如果 local 的 shard 中没有对象,那么就从 victim 中获取对象
            x = p.getSlow(pid)
        }
    }
    runtime_procUnpin()
    // 如果 x 是 nil 并且 p.New 不是 nil,那么就调用 p.New 方法创建一个新的对象
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

下面我们来看如何从 victim 中获取对象p.getSlow(pid)

func (p *Pool) getSlow(pid int) any {
    size := runtime_LoadAcquintptr(&p.localSize) 
    locals := p.local
    // 尝试从其他进程中窃取一个元素。
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i+1)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }
    // 尝试从主缓存中窃取一个元素。因为我们想让 victim 缓存尽可能地被淘汰。
    size = atomic.LoadUintptr(&p.victimSize)
    if uintptr(pid) >= size {
        return nil
    }
    locals = p.victim
    l := indexLocal(locals, pid)
    if x := l.private; x != nil {
        l.private = nil
        return x
    }
    for i := 0; i < int(size); i++ {
        l := indexLocal(locals, (pid+i)%int(size))
        if x, _ := l.shared.popTail(); x != nil {
            return x
        }
    }

    // Mark the victim cache as empty for future gets don't bother
    // with it.
    // 标记 victim 缓存为空,以便将来获取时无需再关心它。
    atomic.StoreUintptr(&p.victimSize, 0)

    return nil
}

注意,当前方法绑定在同一个 p 上运行,所以不需要锁。它首先尝试从其他进程中窃取一个元素,然后尝试从主缓存中窃取一个元素。如果都没有获取到对象,那么就返回 nil。

Put 方法

func (p *Pool) Put(x any) {
    if x == nil {
        return
    }
    if race.Enabled {
        if fastrandn(4) == 0 {
            // 随机将 x 丢弃。
            return
        }
        race.ReleaseMerge(poolRaceAddr(x))
        race.Disable()
    }
    l, _ := p.pin()
    if l.private == nil { // 如果 local 的 private 是 nil,那么就把 x 放到 local 的 private 中
        l.private = x
    } else { // 否则加入到本地队列中
        l.shared.pushHead(x)
    }
    runtime_procUnpin()
    if race.Enabled {
        race.Enable()
    }
}

Put 的逻辑相对简单,优先设置本地 private,如果 private 字段已经有值了,那么就把此元素 push 到本地队列中。

踩坑

内存泄漏

取出来的 bytes.Buffer 在使用的时候,我们可以往这个元素中增加大量的 byte 数据,这会导致底层的 byte slice 的容量可能会变得很大。这个时候,即使 Reset 再放回到池子中,这些 byte slice 的容量不会改变,所占的空间依然很大。而且,因为 Pool 回收的机制,这些大的 Buffer 可能不被回收,而是会一直占用很大的空间,这属于内存泄漏的问题。
可以增加检查逻辑,放回的超过一定大小的 buffer,就直接丢弃掉,不再放到池子中。

内存浪费

除了内存泄漏以外,还有一种浪费的情况,就是池子中的 buffer 都比较大,但在实际使用的时候,很多时候只需要一个小的 buffer,这也是一种浪费现象。
要做到物尽其用,尽可能不浪费的话,我们可以将 buffer 池分成几层。首先,小于 512 byte 的元素的 buffer 占一个池子;其次,小于 1K byte 大小的元素占一个池子;再次,小于 4K byte 大小的元素占一个池子。这样分成几个池子以后,就可以根据需要,到所需大小的池子中获取 buffer 了。


https://www.xamrdz.com/mobile/44x1994736.html

相关文章: