0. 引言
这篇文章介绍一下 Golang channel 的内部实现,包括 channel 的数据结构以及相关操作的代码实现。代码版本 go1.9rc1,部分无关代码直接略去,比如 race detect,对应的代码中的 raceenabled。
1. hchan struct
channel 的底层数据结果是 hchan struct。
type hchan struct {
qcount uint // 队列中数据个数
dataqsiz uint // channel 大小
buf unsafe.Pointer // 存放数据的环形数组
elemsize uint16 // channel 中数据类型的大小
closed uint32 // 表示 channel 是否关闭
elemtype *_type // 元素数据类型
sendx uint // send 的数组索引
recvx uint // recv 的数组索引
recvq waitq // 由 recv 行为(也就是 <-ch)阻塞在 channel 上的 goroutine 队列
sendq waitq // 由 send 行为 (也就是 ch<-) 阻塞在 channel 上的 goroutine 队列
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
selectdone *uint32 // CAS to 1 to win select race (may point to stack)
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
复制代码
上面直接对各个字段做了解释。我们可以看到 channel 其实就是一个队列加一个锁,只不过这个锁是一个轻量级锁。其中 recvq 是读操作阻塞在 channel 的 goroutine 列表,sendq 是写操作阻塞在 channel 的 goroutine 列表。列表的实现是 sudog,其实就是一个对 g 的结构的封装。
2. make
通过 make 创建 channel 对应的代码如下。
func makechan(t *chantype, size int64) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
panic(plainError("makechan: size out of range"))
}
var c *hchan
if elem.kind&kindNoPointers != 0 || size == 0 {
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
} else {
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
复制代码
最前面的两个 if 是一些异常判断:元素类型大小限制和对齐限制。第三个 if 也很明显,判断 size 大小是否小于 0 或者过大。int64(uintptr(size)) != size 这句也是判断 size 是否为负。值得一说的是最后面的判断条件
1
uintptr(size) > (_MaxMem-hchanSize)/elem.size
_MaxMem 我在 Golang 内存管理 那篇文章里面说过,这个是 Arena 区域的最大值,用来分配给堆的。也就是说 channel 是在堆上分配的。
再往下就可以看到分配的代码了。如果 channel 内数据类型不含有指针且 size > 0,则将其分配在连续的内存区域。如果 size = 0,实际上 buf 是不分配空间的。
if elem.kind&kindNoPointers != 0 || size == 0 {
// Allocate memory in one call.
// Hchan does not contain pointers interesting for GC in this case:
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this location for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
c.buf = unsafe.Pointer(c)
}
}
复制代码
除了上面的情况,剩下的,也就是 size > 0,channel 和 channel.buf 是分别进行分配的。剩下的代码是剩下字段的处理。
else {
c = new(hchan)
c.buf = newarray(elem, int(size)) // newarray 也是调用 mallocgc 进行内存分配
}
复制代码
总结一下,make chan 的过程是在堆上进行分配,返回是一个 hchan 的指针。
3. send
send 也就是 ch <- x,对应的函数如下。
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
...
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
复制代码
3.1 nil channel
先来看一下 nil channel 的情况,也就是向没有 make 的 channel 发送数据。上篇文章 深入理解 Go Channel 中留了一个问题:向 nil channel 发送数据会报 fatal error: all goroutines are asleep - deadlock! 错误。
if c == nil {
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
//runtime/trace.go
traceEvGoStop = 16 // goroutine stops (like in select{}) [timestamp, stack]
//runtime/proc.go
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string,
复制代码
gopark 会将当前 goroutine 休眠,然后通过 unlockf 来唤醒,注意我们上面传入的 unlockf 是 nil,也就是向 nil channel 发送数据的 goroutine 会一直休眠。同理,从 nil channel 读数据也是一样的处理。我们再看一眼上一篇文章的例子。
func main() {
var x chan int
go func() {
x <- 1
}()
<-x
}
复制代码
这里一个是 main goroutin 从 nil channel 读数据,进入休眠。go func() 向 nil channel 发送数据,也进入休眠。然后 Go 语言启动的时候还有一个goroutine sysmon 会一直检测系统的运行情况,比如 checkdead()。
func checkdead() {
...
throw("all goroutines are asleep - deadlock!") // 错误信息就是这里报出来的。
}
复制代码
3.2 closed channel
向 close 的 channel 发送数据,直接 panic。
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
复制代码
3.3 发送数据处理
发送数据分三种情况:
有 goroutine 阻塞在 channel 上,此时 hchan.buf 为空:直接将数据发送给该 goroutine。
当前 hchan.buf 还有可用空间:将数据放到 buffer 里面。
当前 hchan.buf 已满:阻塞当前 goroutine。
第一种情况如下。从当前 channel 的等待队列中取出等待的 goroutine,然后调用 send。goready 负责唤醒 goroutine。
lock(&c.lock)
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked. send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
复制代码
第二种情况比较简单。通过比较 qcount 和 dataqsiz 来判断 hchan.buf 是否还有可用空间。除此之后还需要调整一下 sendx 和 qcount。
lock(&c.lock)
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
复制代码
第三种情况如下。
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 一些初始化工作
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 当前 goroutine 如等待队列
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) //休眠
复制代码
4. recv
读取 channel ( <-c )和发送的情况非常类似。
4.1 nil channel
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
...
}
复制代码
4.2 closed channel
从 closed channel 接收数据,如果 channel 中还有数据,接着走下面的流程。如果已经没有数据了,则返回默认值。使用 ok-idiom 方式读取的时候,第二个参数返回 false。
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
复制代码
4.3 接收数据处理当前有发送 goroutine 阻塞在 channel 上,buf 已满
lock(&c.lock)
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
复制代码
buf 中有可用数据
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
复制代码
buf 为空,阻塞
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
复制代码
5. close
关闭 channel 也就是 close(ch) 对应的代码如下(去掉部分冗余代码)。
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist *g
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, unsafe.Pointer(c))
}
gp.schedlink.set(glist)
glist = gp
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for glist != nil {
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
goready(gp, 3)
}
}
复制代码
close channel 的工作除了将 c.closed 设置为 1。还需要:
唤醒 recvq 队列里面的阻塞 goroutine
唤醒 sendq 队列里面的阻塞 goroutine
处理方式是分别遍历 recvq 和 sendq 队列,将所有的 goroutine 放到 glist 队列中,最后唤醒 glist 队列中的 goroutine。
6. select channel
golang 中的 select 语句的实现,在 runtime/select.go 文件中,这篇文章并不打算看 select 的实现。我们要看的是 select 和 channel 一起用的时候。
select {
case c <- x:
... foo
default:
... bar
}
复制代码
会被编译为
if selectnbsend(c, v) {
... foo
} else {
... bar
}
复制代码
对应 selectnbsend 函数如下
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc(unsafe.Pointer(&c)))
}
复制代码
select {
case v = <-c
... foo
default:
... bar
}
复制代码
会被编译为
if selectnbrecv(&v, c) {
... foo
} else {
... bar
}
复制代码
对应 selectnbrecv 函数如下。
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
selected, _ = chanrecv(c, elem, false)
return
}
复制代码
select {
case v, ok = <-c:
... foo
default:
... bar
}
复制代码
会被编译为
if c != nil && selectnbrecv2(&v, &ok, c) {
... foo
} else {
... bar
}
复制代码
对应 selectnbrecv2 函数如下。
func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
// TODO(khr): just return 2 values from this function, now that it is in Go.
selected, *received = chanrecv(c, elem, false)
return
}
复制代码
7. 总结
Golang 的 channel 实现集中在文件 runtime/chan.go 中,本身的代码不是很复杂,但是涉及到很多其他的细节,比如 gopark 等,读起来还是有点费劲的。
8. 参考
Go Source Code 1.9rc1