【go】channel原理

channel实现原理

协程A在执行过程中创建了子协程A1,A2... An,然后等待子协程执行完后退出,那么可以通过以下方式完成对子协程的控制:

  1. 使用channel控制子协程
  2. waitGroup信号量机制控制子协程
  3. Context上下文控制子协程

channel

简单介绍

channel主要是提供给两个协程之间消息传递的,channel分为同步异步两种。

同步的channel缓冲区大小为1,当缓冲区存在数据,发送发阻塞,直到数据被接收方取出;当缓冲区为空,接收方阻塞,直到发送方发送数据。

异步的channel缓冲区大小大于1,当缓冲区满,发送发阻塞,直到有接收方取出数据;当缓冲区为空,接收方阻塞,直到发送方发送数据。

channel实现原理

channel数据结构(runtime/chan.go):

type hchan struct {
 qcount   uint           // total data in the queue(循环队列元素数量)
 dataqsiz uint           // size of the circular queue(循环队列大小)
 buf      unsafe.Pointer // points to an array of dataqsiz elements(循环队列指针)
 elemsize uint16 //chan中元素大小
 closed   uint32 //是否已经close
 elemtype *_type // element type(chan元素类型)
 sendx    uint   // send index(send在buf中索引)
 recvx    uint   // receive index(recv在buf中索引)
 recvq    waitq  // list of recv waiters(receive的等待队列)
 sendq    waitq  // list of send waiters(sender等待队列)

 // lock protects all fields in hchan, as well as several
 // fields in sudogs blocked on this channel.
 //
 // Do not change another G's status while holding this lock
 // (in particular, do not ready a G), as this can deadlock
 // with stack shrinking.
 lock mutex //互斥锁,保护所有字段,上面注释已经讲得非常明确了
}

chan初始化

chan在初始化时,会根据容量大小选择调用makechan64或者makechan,其中makechan64只是做了size检查,然后底层还是调用makechan实现的,makechan来生成hchan对象。

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    //编译器会检查类型是否安全
    // compiler checks this but be safe.
    if elem.size >= 1<<16 {//是否 >= 2^16
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    switch {
        case mem == 0:
        // chan的size或元素的size为0,就不必创建buf
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // 竞争检测器使用此位置进行同步
        c.buf = c.raceaddr()
        case elem.ptrdata == 0:
        // 元素不是指针,分配一块连续的内存给hchan数据结构和buf
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        // hchan数据结构后面紧接着就是buf
        c.buf = add(unsafe.Pointer(c), hchanSize)
        default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    // 将元素大小、类型、容量都记录下来
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)
    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

其实就是根据不同容量和元素类型,来分配不同对象来初始化chan对象的字段,最后返回hchan对象。

send方法

send方法底层调用时会发生:

  1. 锁定整个channel;
  2. 首先判断chan是否为nil(未初始化),如果是nil,发送者会永远阻塞;
  3. 如果chan不为nil,并且chan满了,则直接返回;
  4. 如果chan被close了,会引发panic;
  5. 如果有recvq有等待,说明buf为空,那么直接从recvq中取出一个等待的goroutine,并将数据直接写入;
  6. 如果没有recvq,考虑buf空间是不是已满,如果队列未满,则将数据拷贝到buf中(通过哦memmove()函数内存拷贝);如果队列满了,那么发送者的goroutine就会加入到发送者的等待队列中并阻塞,直到被唤醒(数据被取走,或者chan被close)。

recv方法

  1. 如果chan为nil,接收者会被永远阻塞;
  2. 如果chan不为nil,但chan为空,接收者就会被阻塞,直到有sender发送数据或chan被close;
  3. 如果chan被close了,并且队列中没有缓存的元素,那么返回;
  4. 如果存在sender等待:如果是同步的channel,那么直接将sender的数据复制给reveiver,否则就从buf中;取出一个元素给reveiver,把sender的数据添加到队列尾部;
  5. 如果没有sender等待:如果队列中有元素,就取出一个元素给receiver;如果队列中没有元素,那么当前receiver就会被阻塞,直到它从sender中接收了数据,或者chan被close才返回。

我认为上面原理性的东西了解以下就可以,掌握它的特性即可,在生产环境中避免出现死锁或者无限制的协程创建等问题。

channel状态

channel有三种状态:

  1. nil,即没有初始化的状态;
  2. 正常状态,可以进行正常读写;
  3. closed已关闭

向不同的状态的channel收发数据:

  1. 向nil状态的channel收发,会进入阻塞状态;
  2. 向已关闭的channel发送数据,会引发panic;
  3. 向已关闭的channel接收数据,会返回缓冲值或零值。
  4. close nil/closed状态的channel会引发panic

当channel使用完毕,可以使用内置函数close()函数来关闭channel。channel是引用类型,channel是可以进行垃圾回收的,所以关闭channel并不是必须的,只要没有协程引用channel,最终就会被GC处理。

在使用的过程中不要让协程阻塞到channel上,这种情况很难检测到,而且会造成channel和阻塞在channel的协程占有的资源无法被GC清理最终导致内存泄漏。

使用场景

https://zhuanlan.zhihu.com/p/408598288