Context 接口

对服务器传入的请求应该创建上下文,而对服务器的传出调用应该接受上下文。它们之间的函数调用链必须传递上下文,或者可以使用WithCancelWithDeadlineWithTimeoutWithValue创建的派生上下文。当一个上下文被取消时,它派生的所有上下文也被取消。

context.Context是一个接口,该接口定义了四个需要实现的方法

1
2
3
4
5
6
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}

可以看到Context是一个interface,在golang里面,interface是一个使用非常广泛的结构,它可以接纳任何类型。Context定义很简单,一共4个方法,我们需要能够很好的理解这几个方法

  1. Deadline方法是获取设置的截止时间的意思,第一个返回式是截止时间,到了这个时间点,Context会自动发起取消请求;第二个返回值ok==false时表示没有设置截止时间,如果需要取消的话,需要调用取消函数进行取消。
  2. Done方法返回一个只读的chan,类型为struct{},我们在goroutine中,如果该方法返回的chan可以读取,则意味着parent context已经发起了取消请求,我们通过Done方法收到这个信号后,就应该做清理操作,然后退出goroutine,释放资源。之后,Err 方法会返回一个错误,告知为什么 Context 被取消。
  3. Err方法返回当前 context 结束的原因,只会在Done返回的Chan被关闭时才会返回非空的值。
    • 如果当前 Context 被取消就会返回 Canceled 的错误
    • 如果当前 Context 超时就会返回 DeadlineExceeded 错误
  4. Value方法获取该Context上绑定的值,是一个键值对,所以要通过一个Key才可以获取对应的值,这个值一般是线程安全的。对于同一个上下文来说,多次调用Value 并传入相同的Key会返回相同的结果,该方法仅用于传递跨API和进程间跟请求域的数据

源码提供的四个 Context 实现

  1. emptyCtx 完全空的 Context ,实现的函数也都是返回nil,仅仅是实现了 Context 的接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // emptyCtx永远不会被取消,没有值,也没有截止日期。It is not
    // struct{}, 因为这种类型的变量必须有不同的地址。
    type emptyCtx int

    func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
    }

    func (*emptyCtx) Done() <-chan struct{} {
    return nil
    }

    func (*emptyCtx) Err() error {
    return nil
    }

    func (*emptyCtx) Value(key interface{}) interface{} {
    return nil
    }

    func (e *emptyCtx) String() string {
    switch e {
    case background:
    return "context.Background"
    case todo:
    return "context.TODO"
    }
    return "unknown empty Context"
    }
  1. cancelCtx 继承自 Context ,同时也实现了 canceler 接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    // cancelCtx可以被取消。当取消时,它也取消了所有的子节点
    // 实现canceler。
    type cancelCtx struct {
    Context

    mu sync.Mutex // protects following fields
    done chan struct{} // created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err error // set to non-nil by the first cancel call
    }

    func (c *cancelCtx) Done() <-chan struct{} {
    c.mu.Lock()
    if c.done == nil {
    c.done = make(chan struct{})
    }
    d := c.done
    c.mu.Unlock()
    return d
    }

    func (c *cancelCtx) Err() error {
    c.mu.Lock()
    err := c.err
    c.mu.Unlock()
    return err
    }

    type stringer interface {
    String() string
    }

    func contextName(c Context) string {
    if s, ok := c.(stringer); ok {
    return s.String()
    }
    return reflectlite.TypeOf(c).String()
    }

    func (c *cancelCtx) String() string {
    return contextName(c.Context) + ".WithCancel"
    }

    // cancel closes c.done, cancels each of c's children, and, if
    // removeFromParent is true, removes c from its parent's children.
    //核心是关闭c.done
    //同时会设置c.err = err, c.children = nil
    //依次遍历c.children,每个child分别cancel
    //如果设置了removeFromParent,则将c从其parent的children中删除
    func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
    panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
    c.mu.Unlock()
    return // already canceled
    }
    c.err = err
    if c.done == nil {
    c.done = closedchan
    } else {
    close(c.done)
    }
    for child := range c.children {
    // NOTE: acquiring the child's lock while holding parent's lock.
    child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
    removeChild(c.Context, c)
    }
    }

    可以看到,所有的children都存在一个map中;Done方法会返回其中的done channel, 而另外的cancel方法会关闭Done channel并且逐层向下遍历,关闭children的channel,并且将当前canceler从parent中移除

  2. timerCtx 继承自 cancelCtx ,增加了 timeout 机制

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
    // implement Done and Err. It implements cancel by stopping its timer then
    // delegating to cancelCtx.cancel.
    type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
    }

    func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
    return c.deadline, true
    }

    func (c *timerCtx) String() string {
    return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
    c.deadline.String() + " [" +
    time.Until(c.deadline).String() + "])"
    }

    func (c *timerCtx) cancel(removeFromParent bool, err error) {
    c.cancelCtx.cancel(false, err)
    if removeFromParent {
    // Remove this timerCtx from its parent cancelCtx's children.
    removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
    c.timer.Stop()
    c.timer = nil
    }
    c.mu.Unlock()
    }
  1. valueCtx 存储键值对的数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // A valueCtx carries a key-value pair. It implements Value for that key and
    // delegates all other calls to the embedded Context.
    type valueCtx struct {
    Context
    key, val interface{}
    }

    // stringify tries a bit to stringify v, without using fmt, since we don't
    // want context depending on the unicode tables. This is only used by
    // *valueCtx.String().
    func stringify(v interface{}) string {
    switch s := v.(type) {
    case stringer:
    return s.String()
    case string:
    return s
    }
    return "<not Stringer>"
    }

    func (c *valueCtx) String() string {
    return contextName(c.Context) + ".WithValue(type " +
    reflectlite.TypeOf(c.key).String() +
    ", val " + stringify(c.val) + ")"
    }

    func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {
    return c.val
    }
    return c.Context.Value(key)
    }

Background()和TODO()

  1. Background,主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context,它不能被取消。它是一个emptyCtx的实例。可以认为所有的Context是树的结构,Background是树的根,当任一Context被取消的时候,那么继承它的Context 都将被回收。
  2. TODO,如果我们不知道该使用什么Context的时候,可以使用这个,但是实际应用中,暂时还没有使用过这个TODO

他们两个本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

// Background返回一个非空的上下文。它从来没有被取消过,没有
// 值,并且没有最后期限。它通常由主函数使用,
// 初始化和测试,并作为传入的顶级上下文
// 请求。
func Background() Context {
return background
}

// TODO返回一个非nil的空上下文。代码应该使用上下文。待办事项时
// 不清楚使用哪个上下文,或者还没有可用的上下文(因为
// 还没有扩展到接受上下文参数)。
func TODO() Context {
return todo
}

With 系列函数

WithCancel

初始化一个cancelCtx的同时,还执行了propagateCancel方法,最后返回一个cancel function

传递一个父Context作为参数,返回子Context,以及一个取消函数用来取消Context

WithCancel返回带有新Done通道的父节点的副本。当调用返回的cancel函数或当关闭父上下文的Done通道时,将关闭返回上下文的Done通道,无论先发生什么情况。

取消此上下文将释放与其关联的资源,因此代码应该在此上下文中运行的操作完成后立即调用cancel。

1
2
3
4
5
6
7
8
9
10
11
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

WithDeadline

和WithCancel差不多,它会多传递一个截止时间参数,意味着到了这个时间点,会自动取消Context,当然我们也可以不等到这个时候,可以提前通过取消函数进行取消

取消此上下文将释放与其关联的资源,因此代码应该在此上下文中运行的操作完成后立即调用cancel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

WithTimeout

WithTimeout和WithDeadline基本上一样,这个表示是超时自动取消,是多少时间后自动取消Context的意思。

1
2
3
4
5
6
7
8
9
10
11
12
13
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}

WithValue

WithValue函数能够将请求作用域的数据与 Context 对象建立关系

WithValue返回父节点的副本,其中与key关联的值为val。

仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数。

所提供的键必须是可比较的,并且不应该是string类型或任何其他内置类型,以避免使用上下文在包之间发生冲突。WithValue的用户应该为键定义自己的类型。为了避免在分配给interface{}时进行分配,上下文键通常具有具体类型struct{}。或者,导出的上下文关键变量的静态类型应该是指针或接口。

WithValue函数和取消Context无关,它是为了生成一个绑定了一个键值对数据的Context,这个绑定的数据可以通过Context.Value方法访问到,这是我们实际用经常要用到的技巧,一般我们想要通过上下文来传递数据时,可以通过这个方法,如我们需要tarce追踪系统调用栈的时候。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The provided key must be comparable and should not be of type
// string or any other built-in type to avoid collisions between
// packages using context. Users of WithValue should define their own
// types for keys. To avoid allocating when assigning to an
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

propagateCancel 方法

propagateCancel 的含义就是传递cancel,从当前传入的parent开始(包括该parent),向上查找最近的一个可以被cancel的parent, 如果找到的parent已经被cancel,则将方才传入的child树给cancel掉,否则,将child节点直接连接为找到的parent的children中(Context字段不变,即向上的父亲指针不变,但是向下的孩子指针变直接了); 如果没有找到最近的可以被cancel的parent,即其上都不可被cancel,则启动一个goroutine等待传入的parent终止,则cancel传入的child树,或者等待传入的child终结。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// propagateCancel arranges for child to be canceled when parent is.
// 当父进程被取消时,propagateCancel会安排子进程被取消
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

Context 使用原则和技巧

  • 推荐以参数的方式显示传递Context,不要放到结构体中;parent Context一般为Background
  • 应该把Context作为第一个参数传递给入口请求和出口请求链路上的每一个函数,放在第一位,变量名建议都统一,如ctx
  • 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO()
  • Context的Value相关方法应该传递请求域的必要数据,不应该用于传递可选参数
  • Context是线程安全的,可以放心的在多个goroutine中传递
  • 可以把一个 Context 对象传递给任意个数的 gorotuine,对它执行 取消 操作时,所有 goroutine 都会接收到取消信号。

常用方法示例

  1. WithCancel

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func gen(ctx context.Context) <-chan int {
    dst := make(chan int)
    n := 1
    go func() {
    for {
    select {
    case <-ctx.Done():
    return // return结束该goroutine,防止泄露
    case dst <- n:
    n++
    }
    }
    }()
    return dst
    }
    func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // 当我们取完需要的整数后调用cancel

    for n := range gen(ctx) {
    fmt.Println(n)
    if n == 5 {
    break
    }
    }
    }

    上面的示例代码中,gen函数在单独的goroutine中生成整数并将它们发送到返回的通道。 gen的调用者在使用生成的整数之后需要取消上下文,以免gen启动的内部goroutine发生泄漏。

  2. WithDeadline

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func main() {
    d := time.Now().Add(50 * time.Millisecond)
    ctx, cancel := context.WithDeadline(context.Background(), d)

    // 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。
    // 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
    fmt.Println("overslept")
    case <-ctx.Done():
    fmt.Println(ctx.Err())
    }
    }

    上面的代码中,定义了一个50毫秒之后过期的deadline,然后我们调用context.WithDeadline(context.Background(), d)得到一个上下文(ctx)和一个取消函数(cancel),然后使用一个select让主程序陷入等待:等待1秒后打印overslept退出或者等待ctx过期后退出。 因为ctx50秒后就过期,所以ctx.Done()会先接收到值,上面的代码会打印ctx.Err()取消原因

  3. WithTimeout

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    package main

    import (
    "context"
    "fmt"
    "sync"

    "time"
    )

    // context.WithTimeout

    var wg sync.WaitGroup

    func worker(ctx context.Context) {
    LOOP:
    for {
    fmt.Println("db connecting ...")
    time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
    select {
    case <-ctx.Done(): // 50毫秒后自动调用
    break LOOP
    default:
    }
    }
    fmt.Println("worker done!")
    wg.Done()
    }

    func main() {
    // 设置一个50毫秒的超时
    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
    wg.Add(1)
    go worker(ctx)
    time.Sleep(time.Second * 5)
    cancel() // 通知子goroutine结束
    wg.Wait()
    fmt.Println("over")
    }
  4. WithValue

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package main

    import (
    "context"
    "fmt"
    "sync"

    "time"
    )

    // context.WithValue

    type TraceCode string

    var wg sync.WaitGroup

    func worker(ctx context.Context) {
    key := TraceCode("TRACE_CODE")
    traceCode, ok := ctx.Value(key).(string) // 在子goroutine中获取trace code
    if !ok {
    fmt.Println("invalid trace code")
    }
    LOOP:
    for {
    fmt.Printf("worker, trace code:%s\n", traceCode)
    time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒
    select {
    case <-ctx.Done(): // 50毫秒后自动调用
    break LOOP
    default:
    }
    }
    fmt.Println("worker done!")
    wg.Done()
    }

    func main() {
    // 设置一个50毫秒的超时
    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
    // 在系统的入口中设置trace code传递给后续启动的goroutine实现日志数据聚合
    ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "12512312234")
    wg.Add(1)
    go worker(ctx)
    time.Sleep(time.Second * 5)
    cancel() // 通知子goroutine结束
    wg.Wait()
    fmt.Println("over")
    }

参考链接:

  1. https://studygolang.com/static/pkgdoc/
  2. https://www.liwenzhou.com/posts/Go/go_context/
  3. https://www.jianshu.com/p/e5df3cd0708b
  4. https://blog.csdn.net/u011957758/article/details/82948750