教程集 www.jiaochengji.com
教程集 >  Golang编程  >  golang教程  >  正文 Golang深入理解channel

Golang深入理解channel

发布时间:2023-03-22   编辑:jiaochengji.com
教程集为您提供Golang深入理解channel等资源,欢迎您收藏本站,我们将为您提供最新的Golang深入理解channel资源

前言

Golang在并发编程上有两大利器,分别是channelgoroutine,这篇文章我们先聊聊channel。熟悉Golang的人都知道一句名言:“使用通信来共享内存,而不是通过共享内存来通信”。这句话有两层意思,Go语言确实在sync包中提供了传统的锁机制,但更推荐使用channel来解决并发问题。这篇文章会先从channel的用法、channel的原理两部分对channel做一个较为深入的探究。

channel用法

什么是channel

从字面上看,channel的意思大概就是管道的意思。channel是一种go协程用以接收或发送消息的安全的消息队列,channel就像两个go协程之间的导管,来实现各种资源的同步。可以用下图示意:

channel的用法很简单:

func main() {
    ch := make(chan int1// 创建一个类型为int,缓冲区大小为1的channel
    ch <- 2 // 将2发送到ch
    n, ok := <- ch // n接收从ch发出的值
    if ok {
        fmt.Println(n) // 2
    }
    close(ch) // 关闭channel
}

使用channel时有几个注意点:

  • 向一个nil channel发送消息,会一直阻塞;
  • 向一个已经关闭的channel发送消息,会引发运行时恐慌(panic)
  • channel关闭后不可以继续向channel发送消息,但可以继续从channel接收消息;
  • channel关闭并且缓冲区为空时,继续从从channel接收消息会得到一个对应类型的零值。

Unbuffered channels与Buffered channels

Unbuffered channels是指缓冲区大小为0的channel,这种channel的接收者会阻塞直至接收到消息,发送者会阻塞直至接收者接收到消息,这种机制可以用于两个goroutine进行状态同步;Buffered channels拥有缓冲区,当缓冲区已满时,发送者会阻塞;当缓冲区为空时,接收者会阻塞。

引用The Nature Of Channels In Go中的两张图来说明Unbuffered channelsBuffered channels, 非常形象,读者可自行体会一下:

Unbuffered channels

Unbuffered channels

Buffered channels

Buffered channels

channel的遍历

for range

channel支持 for range 的方式进行遍历:

package main  

import "fmt"  

func main() {  
    ci := make(chan int5)  
    for i := 1; i <= 5; i  {
        ci <- i
    }    
    close(ci)  

    for i := range ci {  
        fmt.Println(i)  
    }  
}  

值得注意的是,在遍历时,如果channel 没有关闭,那么会一直等待下去,出现 deadlock 的错误;如果在遍历时channel已经关闭,那么在遍历完数据后自动退出遍历。也就是说,for range 的遍历方式时阻塞型的遍历方式。

for select

select可以处理非阻塞式消息发送、接收及多路选择。

package main  

import "fmt"  

func main() {  
    ci := make(chan int2)
    for i := 1; i <= 2; i  {
        ci <- i
    }
    close(ci)

    cs := make(chan string2)
    cs <- "hi"
    cs <- "golang"
    close(cs)

    ciClosed, csClosed := falsefalse
    for {
        if ciClosed && csClosed {
            return
        }
        select {
        case i, ok := <-ci:
            if ok {
                fmt.Println(i)
            } else {
                ciClosed = true
                fmt.Println("ci closed")
            }
        case s, ok := <-cs:
            if ok {
                fmt.Println(s)
            } else {
                csClosed = true
                fmt.Println("cs closed")
            }
        default:
            fmt.Println("waiting...")
        }
    }
}  

select中有case代码块,用于channel发送或接收消息,任意一个case代码块准备好时,执行其对应内容;多个case代码块准备好时,随机选择一个case代码块并执行;所有case代码块都没有准备好,则等待;还可以有一个default代码块,所有case代码块都没有准备好时执行default代码块。

channel原理

先贴一下channel的源码地址,读者可以对照来看。

数据结构

先看channel的结构体:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    // channel中元素大小
    elemsize uint16 
    // 是否已关闭
    closed   uint32
    // channel中元素类型
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

channel的缓冲区其实是一个环形队列,qcount表示队列中元素的数量,dataqsiz表示环形队列的总大小,buf表示一个指向循环数组的指针;sendxrecvx分别用来标识当前发送和接收的元素在循环队列中的位置;recvqsendq都是一个列表,分别用于存储当前处于等待接收和等待发送的Goroutine

再看一下waitq的数据结构:

type waitq struct {
    first *sudog
    last  *sudog
}

type sudog struct {
    // 当前goroutine
    g *g

    // isSelect indicates g is participating in a select, so
    // g.selectDone must be CAS'd to win the wake-up race.
    isSelect bool
    next     *sudog
    prev     *sudog
    elem     unsafe.Pointer // data element (may point to stack)

    // The following fields are never accessed concurrently.
    // For channels, waitlink is only accessed by g.
    // For semaphores, all fields (including the ones above)
    // are only accessed when holding a semaRoot lock.

    acquiretime int64
    releasetime int64
    ticket      uint32
    parent      *sudog // semaRoot binary tree
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}

其中sudog表示处于等待列表中的Goroutine封装,包含了一些上下文信息,firstlast分别指向等待列表的首位的Goroutine

编译分析

在分析channel的原理之前,我们先使用go tool分析以下代码,看看channel的各种操作在底层调用了什么运行时方法:

ch := make(chan int2)
ch <- 2
ch <- 1
<-ch
n, ok := <-ch
if ok {
    fmt.Println(n)
}
close(ch)

编译

go build test.go
go tool objdump -s "main\.main" test | grep CALL

CALL过滤出来:

  test.go:118           0x1092f55               e81612f7ff              CALL runtime.makechan(SB)
  test.go:119           0x1092f74               e82714f7ff              CALL runtime.chansend1(SB)
  test.go:120           0x1092f8e               e80d14f7ff              CALL runtime.chansend1(SB)
  test.go:121           0x1092fa5               e8361ff7ff              CALL runtime.chanrecv1(SB)
  test.go:122           0x1092fbd               e85e1ff7ff              CALL runtime.chanrecv2(SB)
  test.go:126           0x1092fd7               e8841cf7ff              CALL runtime.closechan(SB)
  test.go:124           0x1092fea               e8b156f7ff              CALL runtime.convT64(SB)
  print.go:275          0x1093041               e88a98ffff              CALL fmt.Fprintln(SB)
  test.go:47            0x1093055               e896c1fbff              CALL runtime.morestack_noctxt(SB)

创建

从上面的编译分析可以看出在创建channel时调用了运行时方法makechan:

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    // 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, niltrue))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.kind&kindNoPointers != 0:
        // 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间   hchan必需的空间。
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize mem, niltrue))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // 元素中包含指针,为hchan和缓冲区分别分配空间
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
}

makechan的代码逻辑还是比较简单的,首先校验元素类型和缓冲区空间大小,然后创建hchan,分配所需空间。这里有三种情况:当缓冲区大小为0,或者channel中元素大小为0时,只需分配channel必需的空间即可;当channel元素类型不是指针时,则只需要为hchan和缓冲区分配一片连续内存空间,空间大小为缓冲区数组空间加上hchan必需的空间;默认情况,缓冲区包含指针,则需要为hchan和缓冲区分别分配内存。最后更新hchan的其他字段,包括elemsizeelemtypedataqsiz

发送

channel的发送操作调用了运行时方法chansend1, 在
chansend1内部又调用了chansend,直接来看chansend的实现:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // channel为nil
    if c == nil {
        // 如果是非阻塞,直接返回发送不成功
        if !block {
            return false
        }
        // 否则,当前Goroutine阻塞挂起
        gopark(nilnil, waitReasonChanSendNilChan, traceEvGoStop,  【文末有惊喜!】一文读懂golang channel
Golang Channel原理
Go 语言为什么这么快,带你详细了解Golang CSP并发模型
Go并发编程——channel
golang goroutine 通知_深入golang之---goroutine并发控制与通信
Golang学习小结、从入门到精通资料汇总
图解 Go 并发编程
Go语言并发模型:以并行处理MD5为例
golang channel的使用以及调度原理
golang学习笔记(二)—— 深入golang中的协程

[关闭]
~ ~