1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果chan是空
if c == nil {
// 非阻塞,直接返回false,表示未发送成功
if !block {
return false
}
// 阻塞的,挂起goroutine
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
// 如果是非阻塞的,chan未关闭,且chan的buffer已经满了,则返回发送失败
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
// 如果chan已经关闭了,再向chan发送数据,直接报panic
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// 如果有接受者在等待,直接将发送的数据拷贝到
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 如果缓冲的chan,还有空间,将发送的数据拷贝到buffer中
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
// 发送游标+1
c.sendx++
// 发送游标已经到末尾了,重新移到队头
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 缓冲区数量+1
c.qcount++
unlock(&c.lock)
return true
}
// 非阻塞的chan,直接返回写入失败
if !block {
unlock(&c.lock)
return false
}
// chan满了,发送者会被阻塞,构造一个sudog挂起
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 构造sudog放入发送者队列
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
|