当前位置: 首页>后端>正文

grpc onnext控制下一次调用 grpc context cancel

概述

context包是golang1.6开始提供的上下文包,golang1.7移入到标准库。对于context 只是在使用grpc用到过,但是并没有真正去在自己程序使用过,并不了解其中的使用目的。必须和Context做个了断了。

主要作用

主要为了解决多个goroutine 多链路相互嵌套无法终止问题(goroutine泄露问题),以及上下文数据共享问题。其实主要还解决goroutine终止问题,一般上下文数据共享用的少。

实现

实现方式是通过超时时间、截止时间、手动 等方式进行chancel的发送阻塞关闭,完成信号量传递,实现goroutine 终止。其源码实现是context通过手动或自动调用cancel方法发送信号,然后子goroutine进行调用Done方法接收信号,具体实现请查看Context源码分析

grpc onnext控制下一次调用 grpc context cancel,grpc onnext控制下一次调用 grpc context cancel_golang,第1张

Context包源码分析

常用方法

主要方法使用有四个

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
  • WithCancel 手动发送终止信息 需要手动调用CancelFunc方法
  • WithDeadline 传入一个精确截止时间比如(2021-8-2) 到达这个时间自动发送终止信号
  • WithTimeout 传入一个倒计时时间比如100s, 过了100s会自动发送终止信号
  • WithValue 传入 key 和value 值,方便上下文数据共享

Context 接口

上面四个方法中必须都传人Context ,而Context是一套interface,Context 定义一系列接口:

//多个goroutine可以同时调用上下文的方法。
type Context interface {
  Deadline() (deadline time.Time, ok bool)
  Done() <-chan struct{}
  Err() error
  Value(key interface{}) interface{}
}
  • Deadline 返回一个结束的时间点,ok表示是否存在结束时间点
  • Done Context 终止信号,其信号传递方式是 chancel实现
  • Err 返回Context结束的原因,只会在Done 返回时才会返回

    - 当手动终止取消,返回Canceled错误

    - 当超时终止取消,返回DeadlineExceeded错误

  • Value 是在使用WithValue方法的时候才会用到,一般是上级通过WithValue 方式传入KV,Value通过K进行取值。

根Context

如何使用Context interface方法呢?需要使用 初始化一个根Context,通过常用那四种With方法内部实现Context interface 然后进行Context上下文传递。

根Context 有两种实现:

func Background() Context
func TODO() Context
  • Background 返回一个空Context,通常使用在主函数进行初始化使用,一般作为顶级的Context使用。它没有过期时间、不能取消、没有值
  • TODO 也是返回一个 空Context,当不清楚要使用哪个 Context的时候使用

其中源码为:

type emptyCtx int
....
var (
  background = new(emptyCtx)
  todo       = new(emptyCtx)
)
func Background() Context {
  return background
}

func TODO() Context {
  return todo
}

都是返回一个空的emptyCtx 的Context。

四种context

有四种context来应对不同的使用:

分别为:emptyCtxcancelCtxtimerCtxvalueCtx

这个四种Context 都是实现Context interface,方便调用

emptyCtx

emptyCtx 是一个空的Context,函数返回都是空,主要用在Background上

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
  return
}

func (*emptyCtx) Done() <-chan struct{} {
  return nil
}

func (*emptyCtx) Err() error {
  return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
  return nil
}

cancelCtx

cancelCtx是用在WithCancel方法上的,在WithCancel方法中会通过newCancelCtx(Context)进行创建一个cancelCtx Context,主goroutine然后调用cancelCtx 的cancel()方法进行发送终止信号,子goroutine通过cancelCtx的Done()方法进行结束终止信号

type cancelCtx struct {
  Context

  mu       sync.Mutex            // protects following fields
  done     chan struct{}         // created lazily, closed by first cancel call
  children map[canceler]struct{} // set to nil by the first cancel call
  err      error                 // set to non-nil by the first cancel call
}

func (c *cancelCtx) Value(key interface{}) interface{} {
  if key == &cancelCtxKey {
    return c
  }
  return c.Context.Value(key)
}

func (c *cancelCtx) Done() <-chan struct{} {
  c.mu.Lock()
  if c.done == nil {
    c.done = make(chan struct{})
  }
  d := c.done
  c.mu.Unlock()
  return d
}

func (c *cancelCtx) Err() error {
  c.mu.Lock()
  err := c.err
  c.mu.Unlock()
  return err
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
  if err == nil {
    panic("context: internal error: missing cancel error")
  }
  c.mu.Lock()
  if c.err != nil {
    c.mu.Unlock()
    return // already canceled
  }
  c.err = err
  if c.done == nil {
    c.done = closedchan
  } else {
    close(c.done)
  }
  for child := range c.children {
    // NOTE: acquiring the child's lock while holding parent's lock.
    child.cancel(false, err)
  }
  c.children = nil
  c.mu.Unlock()

  if removeFromParent {
    removeChild(c.Context, c)
  }
}

timerCtx

timerCtx是在WithDeadline、WithTimeout方法上使用的。这两个方法用法都几乎一样都是通过时间到达进行自动发送终止信号。而timerCtx Context 是通过定时器 time.AfterFunc 进行实现的,可以查看WithDeadline源码

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
  if parent == nil {
    panic("cannot create context from nil parent")
  }
  if cur, ok := parent.Deadline(); ok && cur.Before(d) {
    // The current deadline is already sooner than the new one.
    return WithCancel(parent)
  }
  c := &timerCtx{
    cancelCtx: newCancelCtx(parent),
    deadline:  d,
  }
  propagateCancel(parent, c)
  dur := time.Until(d)
  if dur <= 0 {
    c.cancel(true, DeadlineExceeded) // deadline has already passed
    return c, func() { c.cancel(false, Canceled) }
  }
  c.mu.Lock()
  defer c.mu.Unlock()
  if c.err == nil {
    c.timer = time.AfterFunc(dur, func() {
      c.cancel(true, DeadlineExceeded)
    })
  }
  return c, func() { c.cancel(true, Canceled) }
}
type timerCtx struct {
  cancelCtx
  timer *time.Timer // Under cancelCtx.mu.

  deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
  return c.deadline, true
}

func (c *timerCtx) String() string {
  return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
    c.deadline.String() + " [" +
    time.Until(c.deadline).String() + "])"
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
  c.cancelCtx.cancel(false, err)
  if removeFromParent {
    // Remove this timerCtx from its parent cancelCtx's children.
    removeChild(c.cancelCtx.Context, c)
  }
  c.mu.Lock()
  if c.timer != nil {
    c.timer.Stop()
    c.timer = nil
  }
  c.mu.Unlock()
}

valueCtx

valueCtx它主要用在WithValue方法上的,WithValue 是进行上下文数据共享使用。

同一Context 进行数据共享,相同key情况下子级Context 会覆盖父Context 的value

type valueCtx struct {
  Context
  key, val interface{}
}

func (c *valueCtx) String() string {
  return contextName(c.Context) + ".WithValue(type " +
    reflectlite.TypeOf(c.key).String() +
    ", val " + stringify(c.val) + ")"
}

func (c *valueCtx) Value(key interface{}) interface{} {
  if c.key == key {
    return c.val
  }
  return c.Context.Value(key)
}

对四种context简单总结:

  1. 都是实现了Context 接口
  2. emptyCtx是一个空的context ,用于初始化根Context
  3. cancelCtx 内部实现发送终止信号发送的Context,主要在cancel方法
  4. timerCtx 是截止时间和超时进行发送的Context,而且继承了cancelCtx,其真正是调用cancelCtx的cancel方法
  5. valueCtx是用在上下文数据共享的Context

信号量的发送和接收

就拿WithCancel来说,WithCancel的返回是一个Context interface和CancelFunc,CancelFunc 是个函数类型,CancelFunc函数一旦调用就会进行终止信号发送。

对应CancelFunc 函数的方法是cancel()

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)  {
  if parent == nil {
    panic("cannot create context from nil parent")
  }
  c := newCancelCtx(parent)
  propagateCancel(parent, &c)
  return &c, func() { c.cancel(true, Canceled) }
}

主要还是要看这两个接口,cancelCtx和timerCtx 这个两个Context 都实现了这个两个方法

type canceler interface {
  cancel(removeFromParent bool, err error) //发送信号
  Done() <-chan struct{} //接收信号 读取chan
}

主要看一下cancelCtx 的cancel方法实现:

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
  if err == nil {
    panic("context: internal error: missing cancel error")
  }
  c.mu.Lock()
  if c.err != nil {
    c.mu.Unlock()
    return // already canceled
  }
  c.err = err
  if c.done == nil {
    c.done = closedchan  //给chan发送一个空的struct:var closedchan = make(chan struct{})
  } else {
    close(c.done) 
  }
  for child := range c.children {
    // NOTE: acquiring the child's lock while holding parent's lock.
    child.cancel(false, err) //关闭子Context
  }
  c.children = nil
  c.mu.Unlock()

  if removeFromParent {
    removeChild(c.Context, c)
  }
}



https://www.xamrdz.com/backend/3yh1922013.html

相关文章: