Golang深入理解channel
前言
Golang在并发编程上有两大利器,分别是channel
和goroutine
,这篇文章我们先聊聊channel
。熟悉Golang的人都知道一句名言:“使用通信来共享内存,而不是通过共享内存来通信”。这句话有两层意思,Go语言确实在sync
包中提供了传统的锁机制,但更推荐使用channel
来解决并发问题。这篇文章会先从channel
的用法、channel
的原理两部分对channel
做一个较为深入的探究。
channel用法
什么是channel
从字面上看,channel
的意思大概就是管道的意思。channel
是一种go协程用以接收或发送消息的安全的消息队列,channel
就像两个go协程之间的导管,来实现各种资源的同步。可以用下图示意:
channel
的用法很简单:
func main() {
ch := make(chan int, 1) // 创建一个类型为int,缓冲区大小为1的channel
ch <- 2 // 将2发送到ch
n, ok := <- ch // n接收从ch发出的值
if ok {
fmt.Println(n) // 2
}
close(ch) // 关闭channel
}
使用channel
时有几个注意点:
- 向一个
nil
channel
发送消息,会一直阻塞; - 向一个已经关闭的
channel
发送消息,会引发运行时恐慌(panic)
; channel
关闭后不可以继续向channel
发送消息,但可以继续从channel
接收消息;- 当
channel
关闭并且缓冲区为空时,继续从从channel
接收消息会得到一个对应类型的零值。
Unbuffered channels与Buffered channels
Unbuffered channels
是指缓冲区大小为0的channel
,这种channel
的接收者会阻塞直至接收到消息,发送者会阻塞直至接收者接收到消息,这种机制可以用于两个goroutine
进行状态同步;Buffered channels
拥有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞。
引用The Nature Of Channels In Go中的两张图来说明Unbuffered channels
与Buffered channels
, 非常形象,读者可自行体会一下:
Unbuffered channels
:
Unbuffered channels
Buffered channels
:
Buffered channels
channel的遍历
for range
channel
支持 for range
的方式进行遍历:
package main
import "fmt"
func main() {
ci := make(chan int, 5)
for i := 1; i <= 5; i {
ci <- i
}
close(ci)
for i := range ci {
fmt.Println(i)
}
}
值得注意的是,在遍历时,如果channel
没有关闭,那么会一直等待下去,出现 deadlock
的错误;如果在遍历时channel
已经关闭,那么在遍历完数据后自动退出遍历。也就是说,for range
的遍历方式时阻塞型的遍历方式。
for select
select
可以处理非阻塞式消息发送、接收及多路选择。
package main
import "fmt"
func main() {
ci := make(chan int, 2)
for i := 1; i <= 2; i {
ci <- i
}
close(ci)
cs := make(chan string, 2)
cs <- "hi"
cs <- "golang"
close(cs)
ciClosed, csClosed := false, false
for {
if ciClosed && csClosed {
return
}
select {
case i, ok := <-ci:
if ok {
fmt.Println(i)
} else {
ciClosed = true
fmt.Println("ci closed")
}
case s, ok := <-cs:
if ok {
fmt.Println(s)
} else {
csClosed = true
fmt.Println("cs closed")
}
default:
fmt.Println("waiting...")
}
}
}
select
中有case
代码块,用于channel
发送或接收消息,任意一个case
代码块准备好时,执行其对应内容;多个case
代码块准备好时,随机选择一个case
代码块并执行;所有case
代码块都没有准备好,则等待;还可以有一个default
代码块,所有case
代码块都没有准备好时执行default
代码块。
channel原理
先贴一下channel
的源码地址,读者可以对照来看。
数据结构
先看channel
的结构体:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
// channel中元素大小
elemsize uint16
// 是否已关闭
closed uint32
// channel中元素类型
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
channel
的缓冲区其实是一个环形队列,qcount
表示队列中元素的数量,dataqsiz
表示环形队列的总大小,buf
表示一个指向循环数组的指针;sendx
和recvx
分别用来标识当前发送和接收的元素在循环队列中的位置;recvq
和sendq
都是一个列表,分别用于存储当前处于等待接收和等待发送的Goroutine
。
再看一下waitq
的数据结构:
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// 当前goroutine
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
其中sudog
表示处于等待列表中的Goroutine
封装,包含了一些上下文信息,first
和last
分别指向等待列表的首位的Goroutine
。
编译分析
在分析channel
的原理之前,我们先使用go tool
分析以下代码,看看channel
的各种操作在底层调用了什么运行时方法:
ch := make(chan int, 2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
fmt.Println(n)
}
close(ch)
编译
go build test.go
go tool objdump -s "main\.main" test | grep CALL
把CALL
过滤出来:
test.go:118 0x1092f55 e81612f7ff CALL runtime.makechan(SB)
test.go:119 0x1092f74 e82714f7ff CALL runtime.chansend1(SB)
test.go:120 0x1092f8e e80d14f7ff CALL runtime.chansend1(SB)
test.go:121 0x1092fa5 e8361ff7ff CALL runtime.chanrecv1(SB)
test.go:122 0x1092fbd e85e1ff7ff CALL runtime.chanrecv2(SB)
test.go:126 0x1092fd7 e8841cf7ff CALL runtime.closechan(SB)
test.go:124 0x1092fea e8b156f7ff CALL runtime.convT64(SB)
print.go:275 0x1093041 e88a98ffff CALL fmt.Fprintln(SB)
test.go:47 0x1093055 e896c1fbff CALL runtime.morestack_noctxt(SB)
创建
从上面的编译分析可以看出在创建channel
时调用了运行时方法makechan
:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.kind&kindNoPointers != 0:
// 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间 hchan必需的空间。
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素中包含指针,为hchan和缓冲区分别分配空间
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
makechan
的代码逻辑还是比较简单的,首先校验元素类型和缓冲区空间大小,然后创建hchan
,分配所需空间。这里有三种情况:当缓冲区大小为0,或者channel
中元素大小为0时,只需分配channel
必需的空间即可;当channel
元素类型不是指针时,则只需要为hchan
和缓冲区分配一片连续内存空间,空间大小为缓冲区数组空间加上hchan
必需的空间;默认情况,缓冲区包含指针,则需要为hchan
和缓冲区分别分配内存。最后更新hchan
的其他字段,包括elemsize
,elemtype
,dataqsiz
。
发送
channel
的发送操作调用了运行时方法chansend1
, 在chansend1
内部又调用了chansend
,直接来看chansend
的实现:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// channel为nil
if c == nil {
// 如果是非阻塞,直接返回发送不成功
if !block {
return false
}
// 否则,当前Goroutine阻塞挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 【文末有惊喜!】一文读懂golang channel
Golang Channel原理
Go 语言为什么这么快,带你详细了解Golang CSP并发模型
Go并发编程——channel
golang goroutine 通知_深入golang之---goroutine并发控制与通信
Golang学习小结、从入门到精通资料汇总
图解 Go 并发编程
Go语言并发模型:以并行处理MD5为例
golang channel的使用以及调度原理
golang学习笔记(二)—— 深入golang中的协程