Go Context 详解

Context 最初的印象是,在 API 项目代码开发中,从中间件处理前序逻辑、 Controller/Action 接收 HTTP 请求开始,所有后续调用都要传递的第一个参数,可以从中获取到客户端的信息、或者临时存储后续逻辑依赖的全局变量(比如Auth中间件解析出的用户id等),另外也可在异常问题排查时,用于全链路的问题排查跟踪。

然而这些功能,其实是各类网络框架比如 gin 等封装后的功能 , Golang 在 1.7 版本开始引入的 context.Context 标准包,其最初目的是增强 Golang 开发中的并发控制技术。

一、Context 解决的问题

在并发程序中,由于超时、取消操作或者一些异常情况,往往需要进行抢占操作或者中断后续操作。熟悉channel的朋友应该都见过使用done channel来处理此类问题。比如以下这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func main() {    
messages := make(chan int10)    
done := make(chan bool)    

defer close(messages)    
// consumer    
go func() {        
ticker := time.NewTicker(1 * time.Second)        
for _ = range ticker.C {            
select {            
case <-done:                
fmt.Println("child process interrupt...")                
return            
default:                
fmt.Printf("send message: %d\n", <-messages)            
}        
}    
}()    

// producer    
for i := 0; i < 10; i++ {        
messages <- i    
}    
time.Sleep(5 * time.Second)    
close(done)    
time.Sleep(1 * time.Second)    
fmt.Println("main process exit!")
}

使用这个方式,可以通过让子协程监听 done channel 来实现子协程的退出控制。虽然看似简明,但是局限性非常明显,试想一个场景:

当一个网络请求开始时,可能由此请求产生出多个、多层级的 goroutine ,但是本质上来讲每个层级的 goroutine 都是平行调度使用,不存在 goroutine “父子” 关系 , 当其中一个 goroutine 执行的任务被取消了或者处理超时了,那么其他被启动起来的Goroutine 都应该迅速退出,另外多个多层的 goroutine 想传递请求域的数据该如何处理?

如果继续使用监听 done channel 来层层控制,整个代码逻辑将变得非常复杂难以维护,所以我们需要一种优雅的解决方案:

  • 上层任务取消后,所有的下层任务都会被取消;
  • 中间某一层的任务取消后,只会将当前任务的下层任务取消,而不会影响上层的任务以及同级任务。

这就是 Context 要解决的问题和实现的目标。

二、Context 的定义

1、Context接口结构
1
2
3
4
5
6
type Context interface {    
Deadline() (deadline time.Time, ok bool)    
Done() <-chan struct{}    
Err() error    
Value(key interface{}) interface{}
}

Context接口包含四个方法:

  • Deadline()返回绑定当前context的任务被取消的截止时间;如果没有设定期限,将返回ok == false
  • Done() 当绑定当前context的任务被取消时,将返回一个关闭的channel;如果当前context不会被取消,将返回nil
  • Err() 如果Done()返回的channel没有关闭,将返回nil;如果Done()返回的channel已经关闭,将返回非空的值表示任务结束的原因。如果是context被取消,Err()将返回Canceled;如果是context超时,Err()将返回DeadlineExceeded
  • Value() 返回context存储的键值对中当前key对应的值,如果没有对应的key,则返回nil

可以看到 Done() 方法返回的 channel 正是用来传递结束信号以抢占并中断当前任务;Deadline()方法指示一段时间后当前goroutine是否会被取消;以及一个Err()方法,来解释goroutine被取消的原因(取消/超时);而Value()则用于获取特定于当前任务树的额外信息。而context所包含的额外信息键值对可以想象一颗树,树的每个节点可能携带一组键值对,如果当前节点上无法找到key所对应的值,就会向上去父节点里找,直到根节点。

2、Context 设计原理

在 Goroutine 构成的树形结构中对信号进行同步以减少计算资源的浪费是 context.Context 的最大作用。Go 服务的每一个请求都是通过单独的 Goroutine 处理的,HTTP/RPC 请求的处理器会启动新的 Goroutine 访问数据库和其他服务。

如下图所示,我们可能会创建多个 Goroutine 来处理一次请求,而 context.Context的作用是在不同 Goroutine 之间同步请求特定数据、取消信号以及处理请求的截止日期。每一个 context.Context都会从最顶层的 Goroutine 一层一层传递到最下层。 context.Context 可以在上层 Goroutine 执行出现错误时,将信号及时同步给下层。

当最上层的 Goroutine 因为某些原因执行失败时,下层的 Goroutine 由于没有接收到这个信号所以会继续工作:

但是当我们正确地使用 context.Context时,就可以在下层及时停掉无用的工作以减少额外资源的消耗:

三、Context 的常用结构

1、默认上下文:emptyCtx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

context.emptyCtx 通过空方法实现了 context.Context 接口中的所有方法,但是没有超时时间,不能取消,也不能存储任何额外信息,所以emptyCtx 用来作为 context 树的根节点。

一般不会直接使用emptyCtx,而是使用由emptyCtx实例化的两个变量,分别可以通过调用BackgroundTODO方法得到,但这两个context在实现上是一样的:

1
2
3
4
5
6
7
8
9
10
11
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}

func TODO() Context {
return todo
}

两者的区别是:

  • context.Background 是上下文的默认值,所有其他的上下文都应该从它衍生出来;
  • context.TODO 应该仅在不确定应该使用哪种上下文时使用;

Context 的层级关系:

2、传值方法:valueCtx
1
2
3
4
5
6
7
8
9
10
11
type valueCtx struct {    
Context    
key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {    
if c.key == key {        
return c.val    
}    
return c.Context.Value(key)
}
  • valueCtx 利用一个 Context 类型的变量来表示父节点 context,所以当前context继承了父context的所有信息;
  • valueCtx类型还携带一组键值对,也就是说这种context可以携带额外的信息。

valueCtx实现了Value方法,用以在context链路上获取key对应的值,如果当前context上不存在需要的key,会沿着context链向上寻找key对应的值,直到根节点。

向 Context 添加键值对的方法:context.WithValue()

1
2
3
4
5
6
7
8
9
func WithValue(parent Context, key, val interface{}) Context {    
if key == nil {        
panic("nil key")    
}    
if !reflect.TypeOf(key).Comparable() {        
panic("key is not comparable")    
}    
return &valueCtx{parent, key, val}
}

如上述代码,context.WithValue() 并非直接向 context 对象添加键值对(原生Context也不支持),而是通过创建 valueCtx 的方法来将键值对添加绑定到子节点上。

3、取消信号:cancelCtx
1
2
3
4
5
6
7
8
9
10
11
12
type cancelCtx struct {    
Context    
mu       sync.Mutex            // protects following fields    
done     chan struct{}         // created lazily, closed by first cancel call    
children map[canceler]struct{} // set to nil by the first cancel call    
err      error                 // set to non-nil by the first cancel call
}

type canceler interface {    
cancel(removeFromParent bool, err error)    
Done() <-chan struct{}
}

valueCtx类似,cancelCtx中也有一个context变量作为父节点;变量done表示一个channel,用来表示传递关闭信号;children表示一个map,存储了当前context节点下的子节点;err用于存储错误信息表示任务结束的原因。

其方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (c *cancelCtx) Done() <-chan struct{} {    
c.mu.Lock()    
if c.done == nil {        
c.done = make(chan struct{})   
}    
d := c.done    
c.mu.Unlock()    
return d
}

func (c *cancelCtx) Err() error {    
c.mu.Lock()    
err := c.err    
c.mu.Unlock()    
return err
}

func (c *cancelCtx) cancel(removeFromParent bool, err error) {    
if err == nil {        
panic("context: internal error: missing cancel error")    
}    
c.mu.Lock()    
if c.err != nil {        
c.mu.Unlock()        
return // already canceled    
}    
// 设置取消原因    
c.err = err    
//设置一个关闭的channel或者将done channel关闭,用以发送关闭信号    
if c.done == nil {        
c.done = closedchan    
else {        
close(c.done)    
}    
// 将子节点context依次取消    
for child := range c.children {        
// NOTE: acquiring the child's lock while holding parent's lock.        
child.cancel(false, err)    
}    
c.children = nil    
c.mu.Unlock()    
if removeFromParent {        
// 将当前context节点从父节点上移除        
removeChild(c.Context, c)    
}
}

可以发现cancelCtx类型变量其实也是canceler类型,因为cancelCtx实现了canceler接口。
Done方法和Err方法没必要说了,cancelCtx类型的context在调用cancel方法时会设置取消原因,将done channel设置为一个关闭channel或者关闭channel,然后将子节点context依次取消,如果有需要还会将当前节点从父节点上移除。

WithCancel 函数用来创建一个可取消的context,即cancelCtx类型的context

  • context.newCancelCtx 将传入的上下文包装成私有结构体 context.cancelCtx
  • context.propagateCancel 会构建父子上下文之间的关联,当父上下文被取消时,子上下文也会被取消:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
type CancelFunc func()

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {    
c := newCancelCtx(parent)    
propagateCancel(parent, &c)    
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {    
// 将parent作为父节点context生成一个新的子节点    
return cancelCtx{Context: parent}
}

func propagateCancel(parent Context, child canceler) {    
if parent.Done() == nil {        
// parent.Done()返回nil表明父节点以上的路径上没有可取消的context        
return // parent is never canceled    
}    

// 获取最近的类型为cancelCtx的祖先节点    
if p, ok := parentCancelCtx(parent); ok {        
p.mu.Lock()        
if p.err != nil {            
// parent has already been canceled            
child.cancel(false, p.err)        
else {            
if p.children == nil {                
p.children = make(map[canceler]struct{})            
}            
// 将当前子节点加入最近cancelCtx祖先节点的children中            
p.children[child] = struct{}{}        
}        
p.mu.Unlock()    
else {        
go func() {            
select {            
case <-parent.Done():                
child.cancel(false, parent.Err())            
case <-child.Done():            
}        
}()    
}
}

func parentCancelCtx(parent Context) (*cancelCtx, bool) {    
for {        
switch c := parent.(type) {        
case *cancelCtx:            
return c, true        
case *timerCtx:            
return &c.cancelCtx, true        
case *valueCtx:            
parent = c.Context        
default:            
return nilfalse        
}    
}
}

之前说到cancelCtx取消时,会将后代节点中所有的cancelCtx都取消,propagateCancel即用来建立当前节点与祖先节点这个取消关联逻辑。

  • 如果parent.Done()返回nil,表明父节点以上的路径上没有可取消的context,不需要处理;
  • 如果在context链上找到到cancelCtx类型的祖先节点,则判断这个祖先节点是否已经取消,如果已经取消就取消当前节点;否则将当前节点加入到祖先节点的children列表。
  • 否则开启一个协程,监听parent.Done()child.Done(),一旦parent.Done()返回的channel关闭,即context链中某个祖先节点context被取消,则将当前context也取消。
4、定时取消:timerCtx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type timerCtx struct {    
cancelCtx    
timer *time.Timer // Under cancelCtx.mu.    

deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {    
return c.deadline, true
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {    
//将内部的cancelCtx取消    
c.cancelCtx.cancel(false, err)    
if removeFromParent {        
// Remove this timerCtx from its parent cancelCtx's children.        
removeChild(c.cancelCtx.Context, c)    
}    
c.mu.Lock()    
if c.timer != nil {        
//取消计时器        
c.timer.Stop()        
c.timer = nil    
}    
c.mu.Unlock()
}

timerCtx内部使用cancelCtx实现取消,另外使用定时器timer和过期时间deadline实现定时取消的功能。timerCtx在调用cancel方法,会先将内部的cancelCtx取消,如果需要则将自己从cancelCtx祖先节点上移除,最后取消计时器。

context 包的另外两个函数:context.WithDeadline()context.WithTimeout() 都可用于创建 context.timerCtx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // 已经过了截止日期
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

context.WithDeadline 在创建 context.timerCtx 的过程中判断了父上下文的截止日期与当前日期,并通过 time.AfterFunc 创建定时器,当时间超过了截止日期后会调用 context.timerCtx.cancel 同步取消信号。

四、Context 在 HTTP 框架中的使用

(代码实现详情分析见参考文档2:深入理解Golang之context

net/http 包中的 http server 实现,就是用了 context

1、首先Server在开启服务时会创建一个valueCtx,存储了server的相关信息,之后每建立一条连接就会开启一个协程,并携带此valueCtx

2、建立连接之后会基于传入的context创建一个valueCtx用于存储本地地址信息,之后在此基础上又创建了一个cancelCtx,然后开始从当前连接中读取网络请求,每当读取到一个请求则会将该cancelCtx传入,用以传递取消信号。一旦连接断开,即可发送取消信号,取消所有进行中的网络请求。

3、读取到请求之后,会再次基于传入的context创建新的cancelCtx,并设置到当前请求对象req上,同时生成的response对象中cancelCtx保存了当前context取消方法。

这样处理的目的主要有以下几点:

  • 一旦请求超时,即可中断当前请求;
  • 在处理构建response过程中如果发生错误,可直接调用response对象的cancelCtx方法结束当前请求;
  • 在处理构建response完成之后,调用response对象的cancelCtx方法结束当前请求。

在整个server处理流程中,使用了一条context链贯穿ServerConnectionRequest,不仅将上游的信息共享给下游任务,同时实现了上游可发送取消信号取消所有下游任务,而下游任务自行取消不会影响上游任务。

五、面试题:Context携带数据是线程安全的吗?为什么?

先说答案,context本身就是线程安全的,所以context携带value也是线程安全的。

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main()  {
ctx := context.WithValue(context.Background(), "myKey", "a1")
go func() {
for {
_ = context.WithValue(ctx, "myKey", "a2")
}
}()
go func() {
for {
_ = context.WithValue(ctx, "myKey", "a3")
}
}()
go func() {
for {
fmt.Println(ctx.Value("myKey"))
}
}()
go func() {
for {
fmt.Println(ctx.Value("myKey"))
}
}()
time.Sleep(3 * time.Second)
}

输出结构一直是 “a1” 因为 context.WithValue() 方法,是给予 ctx 创建了一个新的 valueCtx ,而对 ctx 本身并无任何改动。


参考文档:

1、Golang 之context用法

2、深入理解Golang之context

3、Go 语言并发编程与 Context