channel原理

inner workings

concurrency features

并发能力

  • goroutines

    to execute tasks independently, potentially in parallel.

    可以独立执行任务,也可以并行执行任务

  • channels

    for communication, synchronization between goroutines.

    在两个 goroutine 之间提供信息共享和同步

例如:处理每一个任务

1
2
3
4
5
6
7
8
9
10
func main() {
tasks := getTasks()

// Process each task.
for _, task := range tasks {
process(task)
}

...
}

优化,使用 channel 缓存任务

1
2
3
func main() {
// Buffered channel.
ch := make(chan Task, 3)

使用多个 goroutine ,并发处理任务

1
2
3
4
5
6
7
8

func main() {
// Buffered channel.
ch := make(chan Task, 3)
// Run fixed number of workers.
for i := 0; i < numWorkers; i++ {
go worker(ch)
}

通过 channel 发送任务,并发处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func main() {
// Buffered channel.
ch := make(chan Task, 3)
// Run fixed number of workers.
for i := 0; i < numWorkers; i++ {
go worker(ch)
}
// Send tasks to workers.
hellaTasks := getTasks()
for _, task := range hellaTasks {
taskCh <- task
}
...
}

func worker(ch) {
for {
// Receive task.
task := <-taskCh
process(task)
}
}

channels are inherently interesting

  • goroutine-safe

    线程安全

  • store and pass values between coroutines

    在两个 goroutine 之间存储和传递 value

  • provide FIFO semantics

    提供 先进先出的语义

  • can cause goroutines to block and unblock

    可能导致 goroutine 阻塞和唤醒。

making channels

the hchan struct

创建 channels,也就是 hchan 结构体。

make chan

创建 chan 有两种方式,无缓冲和带缓冲

1
2
# buffered channel
ch := make(chan Task, 3)
1
2
# unbuffered channel 
ch := make(chan int)

带缓冲的 channel 具备上述四个特点。

go/src/runtime/chan.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements 环形队列
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index 发送时的下标
recvx uint // receive index 接受时的下标
recvq waitq // list of recv waiters
sendq waitq // list of send waiters

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
// 锁保护了hchan中的所有字段,以及在该通道上被阻塞的几个 sudogs 中的一些字段。
// 在持有此锁时,请不要更改另一个G的状态(特别是不要准备一个G),因为这可能会与堆栈收缩发生死锁。
lock mutex // 互斥锁
}

chansendx = 0revex = 0

image-20231225103520748

一个入队,sendx = 1revex = 0

image-20231225103853807

再入两个,满队,sendx = 0revex = 0

image-20231225103929168

出队一个,sendx = 0revex = 1

image-20231225104154814

allocates memory

内存分配

allocates an hchan struct on the heap.

image-20231225104435846

初始化 channel 的时候(也就是 hchan),内存分配在堆上。同时返回一个 hchan 的指针。

image-20231225104627325

这也就是为什么可以在两个函数之间传递 channels,而不需要传递 channels 的指针。

sends and receives

goroutine scheduling

发送和接受,也就是 goroutine 之间调度

sends

task queue

例如在任务队列中,G1 负责分发任务,G2 负责接收任务。

G1:

1
2
3
4
5
6
7
func main() {
...
for _, task := range tasks {
taskCh <- task
}
...
}

G2:

1
2
3
4
5
6
func worker() {
for {
task := <-taskCh
process(task)
}
}

实际情况:

在 G1 侧:ch <- task0

task0 发送到 ch

image-20231225105049154
  1. acquire:拿锁

    image-20231225105142340
  2. enqueue:将 task0 的结构体拷贝一份,放到 cha 中(如果 task0 是指针,则拷贝的是8个字节,如果是结构体,则拷贝整个结构体。)

    image-20231225105248461
  3. 释放锁

    image-20231225105622677

在 G2 侧:t := <- ch

ch 中获取任务

image-20231225105908552
  1. 获取锁

    image-20231225105930500
  2. 出队,将 task0 的拷贝从队列中取出来,重新赋值给 t

    image-20231225110023956
  3. 释放锁

    image-20231225110153190

no shared memory ( except hchan )

可以看到,整个过程没有共享内存(除了 hchan 之外),而是使用 copies 拷贝对象。

Do not communicate by sharing memory; 
 instead, share memory by communicating.

​ (no shared memory) copies

如果超过队列最大限度

  • 发送两个任务到 ch

    ch <- task1

    ch <- task2

    image-20231225111111576
  • 再发送一个任务到 ch

    ch <- task3

    image-20231225111230442
  • ch 满了,再发送一个任务到 ch

    ch <- task4

    image-20231225111307480

此时 chanel is full,队列已满。

G1 执行阻塞,在队列接受后恢复。(通过 GMP 调度模型,当 G1 在阻塞之后,P 会切换到 g0 到 M 上执行,G1 的状态变为 gopark,从而避免其他 goroutine 无法被调度执行。)

task 被取走,G1 从阻塞状态恢复到正常状态。

the runtime scheduler

运行时调度

goroutine用户空间的线程,也就是说,创建和管理都是通过 Go runtime(g0),并不是 OS,相比 OS 的线程,Go runtime 的线程更加轻量。

runtime 调度会将 goroutines 调度到 OS 的线程上。

image-20231225112558323

Go 的 M:N 调度模型可以用三个结构体来表示

M:OS thread,操作系统线程

G:goroutine

P:context for scheduling,调度器:

  • 负责调度
  • 为了运行 G,M 一定会持有一个 P

image-20231225112929699

pausing goroutines

阻塞 goroutines

image-20231225113211310

ch <- task4

往满 channel 中发送时,这个 G 进入阻塞状态,G 进入 gopark,G1 改为 waiting

image-20231225113223575

取消 G1 和 M 之间的关联关系

image-20231225113256978

从 P 中调度一个可运行的 G到 M 上执行,让 M 不处于阻塞状态。

image-20231225113351332

这种处理方式,G1 根据需要被阻塞,但操作系统线程没有被阻塞。

当有一个消息从 channel 中被取出,ch 没有满时,G1 不再被阻塞,从阻塞状态中恢复回来。

resuming goroutines

hchan 结构体中存储等待的 sendersreceivers

image-20231225113950734

go/src/runtime/chan.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters 接受者的等待列表
sendq waitq // list of send waiters 发送者的等待列表

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}

type waitq struct { // 等待列表中
first *sudog // 候补 g 的链表
last *sudog
}

go/src/runtime/runtime2.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g // 阻塞的 G

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack) 阻塞时,往 channel 中发送的数据的指针

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

G1 阻塞时,会创建一个 sudog,并且将自己存储进去

image-20231225114044216

并且将这个 sudog 放到 sendq 中。receiver 通过 sendq 来唤醒 G1。而且是在执行调度之前就将 G 唤醒(也就是先唤醒 G ,然后 G 等待被 M 调度)image-20231225114151682

G2 开始执行

t := <- ch

image-20231225114411502

task1ch 中取出来

image-20231225114658029

取走之后,从 sendq 中获取链表中的 first sodug

image-20231225114825515

sudog 中的 elem 中的 task4 放到 ch

image-20231225114918534

将 G1 的状态从 gopark 改为 goready

image-20231225115132024

并且将 G1 调度到本地队列,依据亲缘性原则,将 G1 放到 gonext

image-20231225115205734

sends and receives when the receiver comes first.

发送和接受,是接收首先发生。

receives

当接收者阻塞

t := <- ch

从一个空 channel 中接收数据

image-20231225115424467

此时,会将 G2 暂停,同样,将 G2 状态改为 gopark,将 G2 放到 sudog 中,加入到 chrecvq 中。

并且将 t 的指针存储到 elem 中(这是很有意思的点)

image-20231225115604787

在 G1 中,当 task 被放到 ch 中时:

本应该将 task 放到 buf 中,然后修改 G2 状态到 goready

但是这里有一个 smarter 的方法:

elem 中存储了 t 的指针,G1 将 task 直接存储到 t 上。

image-20231225115938753

t 其实是一个局部变量,也就是 t 是存储在栈上的。也就是说,G1 直接操作了 G2 的栈。

image-20231225153720777

正常情况下,如果一个线程操作另外一个线程的栈,会直接报错,或者引起数据不一致。但是在 Go 中,可以通过保证 happens before 来确保数据一致性。

这种情况,只会发生在 runtime 中。

在恢复时,G2不需要获取通道锁并操作缓冲区。同时,也只会有很少的内存拷贝。

understand channels

  • goroutine-safe

    hchan 中有互斥锁

  • Store values, pass in FIFO

    拷贝到 hchanbuffer,也从 buffer 拷贝出来

  • can cause goroutines to pause and resume

    利用hchansudog 队列

    调用运行时调度器,goparkgoready

unbuffered channels

无缓冲的 channels 的工作方式总是与 direct send 方式一致:

  • 接收者先行:发送者写入到接收者的 stack
  • 发送者先行:接收者直接从 sudog 上直接获取数据

select

一般情况下:

  • 给所有的 channel 加锁
  • 将一个 sudog 放到所有 channelsendq/recvq 队列中
  • 解锁,处于 select 下的 G 置于阻塞状态
  • 使用 CAS 操作,因此只会有一个 channel 可继续执行
  • resuming mirrors the pause sequence.(恢复镜像暂停序列)

stepping back:

design considerations,设计思想。

Go channel 的设计思想主要是 简单(simplicity) 和 高效(performance)。

Simplicity

使用锁的队列优先与无锁队列:

“The performance improvement does not materialize from the air, it comes with code complexity increase.” — dvyokov

“性能的提升并非凭空出现,而是伴随着代码复杂度的增加。” — dvyokov

使用锁,相比代码和使用更加简单。

performance

调用 runtime 调度器:

  • OS 线程不会被阻塞(G 阻塞时,M上会执行其他 G)

goroutine 栈的读和写

  • goroutine 唤醒的路径是无锁的(唤醒后,不再从 buffer 中加锁获取数据)
  • 减少潜在的内存拷贝(唤醒后,直接通过 sudog 拿到数据,无需拷贝到 buffer;或者唤醒前,将数据放到 sudog,也无需拷贝到 buffer

这两点,通过在 GC、压缩栈上,优化了内存管理。

这是在简单性和高效性上做的精确的取舍。

unbuffered channels

无缓冲的 channels

1
ch := make(chan int)

无缓冲 channel 的工作方式:

如果是接收者先行:(接收者从 ch 中接收数据,<- ch

  • sudog 放到 recvq 队列中,然后阻塞
  • 随后,接收者将 直接发送 数据到 sudog,然后恢复接收者

如果是发送者先行:(发送者往 ch 中发送数据,ch <- 1

  • 将它自己作为 sudog 放到 sendq 队列中,然后阻塞
  • 接收者从 sudog直接接收 数据,并且恢复发送者

selects

使用方式和例子

1
2
3
4
5
6
7
8
9
10
11
12
13
var taskCh = make(chan Task, 3)
var cmdCh = make(chan Command)

func worker() {
for {
select {
case task := <-taskCh:
process(task)
case cmd := <-cmdCh:
execute(cmd)
}
}
}

reference

GopherCon 2017: Kavya Joshi - Understanding Channels