Go Channel 详解

Channel 在 Go 语言中的地位,相当于 Goroutine 的孪生兄弟,Goroutine 负责并发,而 Channel 就是他们之间的通信机制,它可以在多个 goroutine 之间发送信息。

它也是 Go 语言中最常见的、也是经常被人提及的设计模式:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。在很多主流的编程语言中,多个线程传递数据的方式一般都是共享内存,为了解决线程竞争,我们需要限制同一时间能够读写这些变量的线程数量,然而这与 Go 语言鼓励的设计并不相同。

虽然我们在 Go 语言中也能使用共享内存加互斥锁进行通信,但是 Go 语言提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。GoroutineChannel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。

Channel 在多并发操作里是属于协程安全的,并且遵循了 FIFO (First In First Out, 先入先出) 特性。即先执行读取的 goroutine 会先获取到数据,先发送数据的 goroutine 会先输入数据。另外,channel 的使用将会引起 Go runtime 的调度调用,会有阻塞和唤起 goroutine 的情况产生。

Channel的结构图:

一、创建和关闭 Channel

创建Channel的时候,有三个关键信息需要进行传入定义:

  • 接受方向,可在使用 <- 进行声明
  • 数据类型
  • 缓冲空间大小
1
2
3
4
5
6
7
8
9
10
// 接受方向
ch1 := make(chan T) // 不声明方向,为双向Channel,即可接收T类型数据也可发送T类型数据
ch2 := make(chan<- T) // 只可用来向Channel发送 T 类型数据
ch3 := make(<-chan T) // 只可用来从Channel接收 T 类型数据

// 数据类型
ch4 := make(chan int) // 双向的 channel 可以用来收发,类型是 int

// 缓冲空间
ch5 := make(chan int, 100) // 双向的 channel 可以用来收发,类型是 int,大小 100

注: make(chan T) 返回的是一个指向 runtime.hchan 结构体的指针

其中 <-总是优先和最左边的类型结合,如下:

1
2
3
4
chan<- chan int    // 等价 chan<- (chan int)
chan<- <-chan int // 等价 chan<- (<-chan int)
<-chan <-chan int // 等价 <-chan (<-chan int)
chan (<-chan int)

Channel 的关闭操作可以直接使用内建的 close 方法,如下:

1
close(ch)

注:向已关闭的channel中发送数据

二、Channel 信息收发和阻塞

1、基础操作
1
2
3
4
5
6
// send语句:将数据传入channel
ch <- 1
// receive语句:从channel读取数据
i := <- ch
// channel的 receive支持 multi-valued assignment,可以用于检查 channel 是否已被关闭,如
v, ok := <- ch
2、关于阻塞:

(1) send 语句 的阻塞场景

  • 当 receiver 未准备好时,send语句会被阻塞

  • ch 为有缓冲空间的 Channel,且其缓冲空间已存满,send 语句会被阻塞

(2)receive语句:

  • 当 sender 未准备好时,receive 语句会被阻塞
  • ch 为有缓冲空间的 Channel,且 Channel 为空时,receive 语句会被阻塞

从一个被 close 的 Channel 中接收数据不会被阻塞,而是立即返回,接收完已发送的数据后会返回元素类型的零值(zero value)。这时候你可以使用一个额外的返回参数来检查channel是否关闭。

看个有趣的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
c := make(chan interface{}, 2)

c <- 1
c <- nil

close(c)

res1, ok1 := <- c
res2, ok2 := <- c
res3, ok3 := <- c
fmt.Println(res1, ok1)
fmt.Println(res2, ok2)
fmt.Println(res3, ok3)
}

得到的输出结果如下:

1
2
3
1 true
<nil> true
<nil> false

这里有两个点:

  • Channel 被关闭后如果缓冲区还有内容,在 receive 操作时其为空判定值(ok) 仍返回 true
  • 第二行的输出 <nil> 是 channel 返回的内容,第三行的 <nil> 其实是 channel 已经为空,这时候通过 ok == false 来判断 Channel 已被关闭了

综上,判定 Channel 已关闭,需要两个返回值一起判定:res 为当前类型零值,且判定值 ok 返回 false

3、Range

for …… range语句可以处理Channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
go func() {
time.Sleep(1 * time.Hour)
}()

c := make(chan int)
go func() {
for i := 0; i < 10; i = i + 1 {
c <- i
}
close(c)
}()
for i := range c {
fmt.Println(i)
}
fmt.Println("Finished")
}

range c 产生的迭代值为Channel中发送的值,它会一直迭代直到channel被关闭。上面的例子中如果把close(c)注释掉,程序会一直阻塞在for …… range那一行。

  • close(c) 保留时,该段 demo 中的第一个 time.Sleep(1 * time.Hour) 是可以注释掉的
  • close(c)time.Sleep(1 * time.Hour) 一同注释掉时, for....range 在输出完 c 中的所有信息后,会报 Panic:fatal error: all goroutines are asleep - deadlock!

三、Select 操作

1、select 常规使用

select语句选择一组可能的 send 操作和 receive 操作去处理。它类似 switch, 但是只是用来处理通讯(communication)操作。
它的 case 可以是 send 语句,也可以是 receive 语句,亦或者 default

receive语句可以将值赋值给一个或者两个变量。它必须是一个 receive操作。

最多允许有一个 default case,它可以放在 case 列表的任何位置,尽管我们大部分会将它放在最后。

当没有 case 需要处理是,select 会进入阻塞状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// demo: select + channel 实现斐波那契数列
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}

func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}

上述demo中,

2、select的坑:执行顺序

上述 select 的demo中,正常执行,将得到长度为10的斐波那契数列。但如果将 quit <- 0 这一句放在main函数中 for 循环前面:

1
2
3
4
5
6
7
go func() {
quit <- 0

for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
}()

再执行,结果将是直接输出 quit,或者得到一个1-10位随机长度的斐波那契数列。

原因是:select 在多个 channel 都满足条件的情况下,执行顺序是随机的

3、select的超时处理

前面提到,如果没有 case 需要处理,select语句就会一直阻塞着。这时候我们可能就需要一个超时操作,用来处理超时的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
}

其实它利用的是time.After方法,它返回一个类型为<-chan Time的单向的channel,在指定的时间发送一个当前时间给返回的channel中。

四、Timer 和 Ticker

1
2
3
4
5
// time.Timer 结构体
type Timer struct {
C <-chan Time
r runtimeTimer
}

Timer 是一个定时器,其对外提供的结构体如上所示。它提供一个 Channel ,在创建时传入一个计时时长,并在到时间后通过channel返回一个信号,信号值为计时结束时间戳的 time.Time 对象。

Timer 的一些基础用法:

1
2
3
4
5
6
7
8
timer1 := time.NewTimer(time.Second) // 倒计时1s
endTime := <- timer1.C // 倒计时结束

endTime2 := <- time.After(time.Second) // 倒计时一秒结束,简单的等待相当于 time.Sleep(time.Second)

stop := timer1.Stop() // 强行停止计时器

timer1.Reset(time.Second * 10) // 重置计时器为倒计时10s - 从reset开始重新计时

TickerTimer 的区别在于:Timer 相当于一次性闹钟,设置时间间隔到期后提醒一次结束,除非再次Reset否则不回再执行,Ticker 是循环闹钟,每隔一定的时间间隔返回一次。

Ticker 的用法demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main()  {
ticker := time.NewTicker(2 * time.Second)

go func() {
for t := range ticker.C {
fmt.Println(t.Unix())
}
}()

time.Sleep(10 * time.Second)
ticker.Reset(1 * time.Second)
time.Sleep(10 * time.Second)
ticker.Stop()
}

其输出结果,先输出5次时间戳,间隔2s,后输出10次,间隔1s,然后结束。

Ticker 错误用法 Demo

1
2
3
4
5
6
7
8
func main() {
for {
select {
case <-time.Tick(3 * time.Second): // 这里会不断生成 ticker,而且 ticker 会进行重新调度,造成泄漏
fmt.Println("每隔3秒执行一次")
}
}
} 

五、单向 Channel 的使用场景和作用

为了防止被滥用,Go 语言的类型系统提供了单方向的 channel 类型,分别用于只发送或只接收的 channel 。类型 <-chan int 表示一个只接收 int 的 channel, chan<- int 表示一个只发送 int 的 channel,(箭头 <- 和关键字 chan 的相对位置表明了channel的方向。),这种限制将在编译期检测。

由此可知,单向 channel 独立存在是没有意义的,而前面提到,make(chan T) 返回的是一个指向 runtime.hchan 结构体的指针,所以 Channel 本身都是引用传递。这就决定了,单向 channel 只是通过其单向的限制性来保证操作的安全,而其指针指向的空间可以通过另外的 channel 变量来进行相对应的操作。如下demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func counter(out chan<- int) { // 参数类型为单向channel
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}

func squarer(out chan<- int, in <-chan int) { // 参数类型为单向channel
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) { // 参数类型为单向channel
for v := range in {
fmt.Println(v)
}
}
func main() {
naturals := make(chan int) // 原始变量本身为双向channel
squares := make(chan int)
go counter(naturals) // 赋值传入,包含缓冲区指针
go squarer(squares, naturals)
printer(squares)
}

参考资料:

1、Go Channel 详解|鸟窝

2、详解Golang中Channel的用法

3、Golang定时器——Timer 和 Ticker