Back
详情
详解golang协程池 gopool的实现原理

解析 Golang 协程池 gopool 设计与实现 - 掘金
我们知道协程比线程更加轻量化,因此在高并发场景下,我们可能会启动大量的协程来处理业务逻辑。

协程池是一种利用池化技术,复用对象,减少内存分配的频率以及协程创建开销,限制协程数量,从而提高协程执行效率的技术。

gopool 的目标是作为 go 关键字的一个可选方案,对外暴露了gopool.Go 这个函数,因此我们可以用gopool.Go代替关键字go:

go func() {
    // do your job
}()

gopool.Go(func(){
    /// do your job
})

核心实现

在go的源码中,gopool.Go 函数其实是对全局 defaultPool 变量的 CtxGo 方法的调用(golang 标准库中有很多函数也是类似的实现方式,比如 net/http 的 client 和 handler),所以如果要了解原理,就需要看 defaultPool 本身是什么。
从定义上看,defaultPool 是一个 Pool 接口的实例,由 NewPool 函数初始化,这个函数的功能很简单,它根据入参构造了一个 pool 结构,然后将这个结构返回。pool 结构是 Pool 接口的一个实现。

Pool接口

首先看一下接口Pool的定义:

type Pool interface {
    // 池子的名称
    Name() string
        
    // 设置池子内Goroutine的容量
    SetCap(cap int32)
        
    // 执行 f 函数
    Go(f func())
        
    // 带 ctx,执行 f 函数
    CtxGo(ctx context.Context, f func())
        
    // 设置发生panic时调用的函数
    SetPanicHandler(f func(context.Context, interface{}))
}

pool结构以及NewPool

再看下pool的结构:

type pool struct {
    // 协程池的名字,有 Name 方法可以返回
    name string

    // 这个协程池同时最多允许多少 worker 存在
    cap int32
  
    // 配置信息,目前只有一个阈值的属性,具体见下文
    config *Config
  
    // task 队列的元信息,每一个 task 代表一个待执行的函数
    taskHead  *task
    taskTail  *task
    taskLock  sync.Mutex
    taskCount int32

    // 当前有多少个 worker 在运行中,每个 worker 代表一个 goroutine
    workerCount int32

    // 由这个协程池中的协程引发的 panic 会由该函数处理
    panicHandler func(context.Context, interface{})
}

func NewPool(name string, cap int32, config *Config) Pool {
    p := &pool{
        name:   name,
        cap:    cap,
        config: config,
    }
    return p
}

调用 NewPool 获取了以 Pool 的形式返回的 pool 结构体

综合前一节 pool 的定义,我们可以看到,一个协程池 pool 对应了一组task

task结构

type task struct {
    ctx context.Context
    f   func()

    next *task
}

worker

一个 worker 就是逻辑上的一个执行器,它唯一对应到一个协程池 pool。当一个worker被唤起,将会开启一个goroutine ,不断地从 pool 中的 task链表获取任务并执行

type worker struct {
    pool *pool
}
func (w *worker) run() {
    go func() {
        for {
            // 声明即将执行的 task
            var t *task
                        
            // 操作 pool 中的 task 链表,加锁
            w.pool.taskLock.Lock()
            if w.pool.taskHead != nil {
                // 拿到 taskHead 准备执行
                t = w.pool.taskHead
                                
                // 更新链表的 head 以及数量
                w.pool.taskHead = w.pool.taskHead.next
                atomic.AddInt32(&w.pool.taskCount, -1)
            }
            
            // 如果前一步拿到的 taskHead 为空,说明无任务需要执行,清理后返回
            if t == nil {
                w.close()
                w.pool.taskLock.Unlock()
                w.Recycle()
                return
            }
            w.pool.taskLock.Unlock()
                        
            // 执行任务,针对 panic 会recover,并调用配置的 handler
            func() {
                defer func() {
                    if r := recover(); r != nil {
                        msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
                        logger.CtxErrorf(t.ctx, msg)
                        if w.pool.panicHandler != nil {
                            w.pool.panicHandler(t.ctx, r)
                        }
                    }
                }()
                t.f()
            }()
            t.Recycle()
        }
    }()
}

通过上面我们知道,如果把 pool 看作是一个可操作单元,那么它内部维护了一个 task 的队列(通过链表来实现),其中的每个 task 结构代表一个待执行的函数,除此之外,它还对应多个 worker,这些 worker 从 task 中获取函数并执行

总结来说,pool.CtxGo 方法是 task 的生产者,worker 则是 task 的消费者,两者的交互通过 task 链表来完成

pool.CtxGo

下面我们直接来看 pool.CtxGo 这个方法,它也是协程池的核心方法,它的定义是这样的:

func Go(f func()) {
    CtxGo(context.Background(), f)
}

func CtxGo(ctx context.Context, f func()) {
    defaultPool.CtxGo(ctx, f)
}


func (p *pool) CtxGo(ctx context.Context, f func()) {
  // 从 taskPool 中取一个 task 结构体,通过复用结构体来减少 gc 压力
    t := taskPool.Get().(*task)
  
  // 使用入参来初始化 task 结构
    t.ctx = ctx
    t.f = f
  
  // 通过加锁将 task 并发安全地放在队列的尾部,并更新队列长度
    p.taskLock.Lock()
    if p.taskHead == nil {
        p.taskHead = t
        p.taskTail = t
    } else {
        p.taskTail.next = t
        p.taskTail = t
    }
    p.taskLock.Unlock()
    atomic.AddInt32(&p.taskCount, 1)
  
    // 满足条件时,从 workerPool 中取一个 worker 结构并在初始化后调用其 run 方法
    // 且当以下两个条件满足时,创建新的 worker 并唤起执行:
    // 1. task的数量超过了配置的限制 
    // 2. 当前运行的worker数量小于上限(或无worker运行)
    if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
        // worker数量+1
        p.incWorkerCount()
        // 创建一个新的worker,并把当前 pool 赋值
        w := workerPool.Get().(*worker)
        w.pool = p
        w.run()
    }
}

通过上述代码我们知道,gopool 会自行维护一个 defaultPool,这是一个默认的 pool 结构体,在引入包的时候就进行初始化当我们直接调用 gopool.CtxGo() 时,本质上是调用了 defaultPool 的同名方法

func init() {
    defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}

const (
    defaultScalaThreshold = 1
)

// Config is used to config pool.
type Config struct {
    // 控制扩容的门槛,一旦待执行的 task 超过此值,且 worker 数量未达到上限,就开始启动新的 worker
    ScaleThreshold int32
}

// NewConfig creates a default Config.
func NewConfig() *Config {
    c := &Config{
        ScaleThreshold: defaultScalaThreshold,
    }
    return c
}

当我们调用 CtxGo时,gopool 就会更新维护的任务链表,并且判断是否需要扩容 worker

若此时已经有很多 worker 启动(底层一个 worker 对应一个 goroutine),不需要扩容,就直接返回。
若判断需要扩容,就创建一个新的worker,并调用 worker.run()方法启动,各个worker会异步地检查 pool 里面的任务链表是否还有待执行的任务,如果有就执行

task、worker和pool的关系

  • task 是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构;
  • worker 是一个实际执行任务的执行器,它会异步启动一个 goroutine 执行协程池里面未执行的task
  • pool 是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的 worker

性能优化

gopool的作者应用了多次 sync.Pool 来池化对象的创建,复用woker和task对象。
task 池化

var taskPool sync.Pool

func init() {
    taskPool.New = newTask
}

func newTask() interface{} {
    return &task{}
}

func (t *task) Recycle() {
    t.zero()
    taskPool.Put(t)
}

worker 池化

var workerPool sync.Pool

func init() {
    workerPool.New = newWorker
}

func newWorker() interface{} {
    return &worker{}
}

func (w *worker) Recycle() {
    w.zero()
    workerPool.Put(w)
}