1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
// src/runtime/chan.go:454
// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 如果chan是nil的话
if c == nil {
// 非阻塞调用,则直接返回false, false
if !block {
return
}
// 阻塞调用,一直等待接收nil的chan,goroutine挂起
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 如果是非阻塞且chan是空的
if !block && empty(c) {
// 如果chan是未关闭的,直接返回false,false
if atomic.Load(&c.closed) == 0 {
return
}
// chan已经关闭,并且为空,老实说。这段代码感觉有点多余,下面也处理了这种情况
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
// 对于已关闭的chan执行接收,不忽略返回值的情况下,会受到该类型的零值,清理ep的内存
typedmemclr(c.elemtype, ep)
}
// 返回selected为true
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// chan已经关闭,且缓存中无数据,直接返回该类型的零值
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
if sg := c.sendq.dequeue(); sg != nil {
// 如果sender中有等待发送,那么可以分为两种情况
// 1、非缓冲队列,即同步chan,则直接从sender中接收值。
// 2、缓冲队列,即异步chan,从缓冲队列的头部拷贝到接收者,拷贝发送队列的值到缓冲队列末尾
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓冲型chan,buf里面有元素,直接从buf里面拿
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 代码里面需要接收值,则需要拷贝值,比如接收是`val<-ch`,而不是`<-ch`,需要把chan的值拷贝到val
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清空掉原来buf中对应位置的值
typedmemclr(c.elemtype, qp)
// 接收index+1
c.recvx++
// 如果接收索引已经到末尾,重新移到队首
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓冲区大小减一
c.qcount--
// 解锁
unlock(&c.lock)
return true, true
}
if !block {
// 非阻塞接收,解锁,返回false,false
unlock(&c.lock)
return false, false
}
// 无发送者,这个接收值需要被阻塞.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 构造一个接收数据的sudog.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 放入接受者队列中
c.recvq.enqueue(mysg)
// 将goroutine挂起
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
|