channel实现原理
协程A在执行过程中创建了子协程A1,A2... An,然后等待子协程执行完后退出,那么可以通过以下方式完成对子协程的控制:
- 使用channel控制子协程
- waitGroup信号量机制控制子协程
- Context上下文控制子协程
channel
简单介绍
channel主要是提供给两个协程之间消息传递的,channel分为同步和异步两种。
同步的channel缓冲区大小为1,当缓冲区存在数据,发送发阻塞,直到数据被接收方取出;当缓冲区为空,接收方阻塞,直到发送方发送数据。
异步的channel缓冲区大小大于1,当缓冲区满,发送发阻塞,直到有接收方取出数据;当缓冲区为空,接收方阻塞,直到发送方发送数据。
channel实现原理
channel数据结构(runtime/chan.go):
type hchan struct {
qcount uint // total data in the queue(循环队列元素数量)
dataqsiz uint // size of the circular queue(循环队列大小)
buf unsafe.Pointer // points to an array of dataqsiz elements(循环队列指针)
elemsize uint16 //chan中元素大小
closed uint32 //是否已经close
elemtype *_type // element type(chan元素类型)
sendx uint // send index(send在buf中索引)
recvx uint // receive index(recv在buf中索引)
recvq waitq // list of recv waiters(receive的等待队列)
sendq waitq // list of send waiters(sender等待队列)
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex //互斥锁,保护所有字段,上面注释已经讲得非常明确了
}
chan初始化
chan在初始化时,会根据容量大小选择调用makechan64
或者makechan
,其中makechan64
只是做了size检查,然后底层还是调用makechan
实现的,makechan
来生成hchan
对象。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
//编译器会检查类型是否安全
// compiler checks this but be safe.
if elem.size >= 1<<16 {//是否 >= 2^16
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// chan的size或元素的size为0,就不必创建buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 竞争检测器使用此位置进行同步
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不是指针,分配一块连续的内存给hchan数据结构和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// hchan数据结构后面紧接着就是buf
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 将元素大小、类型、容量都记录下来
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
其实就是根据不同容量和元素类型,来分配不同对象来初始化chan对象的字段,最后返回hchan对象。
send方法
send方法底层调用时会发生:
- 锁定整个channel;
- 首先判断chan是否为nil(未初始化),如果是nil,发送者会永远阻塞;
- 如果chan不为nil,并且chan满了,则直接返回;
- 如果chan被close了,会引发panic;
- 如果有recvq有等待,说明buf为空,那么直接从recvq中取出一个等待的goroutine,并将数据直接写入;
- 如果没有recvq,考虑buf空间是不是已满,如果队列未满,则将数据拷贝到buf中(通过哦memmove()函数内存拷贝);如果队列满了,那么发送者的goroutine就会加入到发送者的等待队列中并阻塞,直到被唤醒(数据被取走,或者chan被close)。
recv方法
- 如果chan为nil,接收者会被永远阻塞;
- 如果chan不为nil,但chan为空,接收者就会被阻塞,直到有sender发送数据或chan被close;
- 如果chan被close了,并且队列中没有缓存的元素,那么返回;
- 如果存在sender等待:如果是同步的channel,那么直接将sender的数据复制给reveiver,否则就从buf中;取出一个元素给reveiver,把sender的数据添加到队列尾部;
- 如果没有sender等待:如果队列中有元素,就取出一个元素给receiver;如果队列中没有元素,那么当前receiver就会被阻塞,直到它从sender中接收了数据,或者chan被close才返回。
我认为上面原理性的东西了解以下就可以,掌握它的特性即可,在生产环境中避免出现死锁或者无限制的协程创建等问题。
channel状态
channel有三种状态:
- nil,即没有初始化的状态;
- 正常状态,可以进行正常读写;
- closed已关闭
向不同的状态的channel收发数据:
- 向nil状态的channel收发,会进入阻塞状态;
- 向已关闭的channel发送数据,会引发panic;
- 向已关闭的channel接收数据,会返回缓冲值或零值。
- close nil/closed状态的channel会引发panic
当channel使用完毕,可以使用内置函数close()
函数来关闭channel。channel是引用类型,channel是可以进行垃圾回收的,所以关闭channel并不是必须的,只要没有协程引用channel,最终就会被GC处理。
在使用的过程中不要让协程阻塞到channel上,这种情况很难检测到,而且会造成channel和阻塞在channel的协程占有的资源无法被GC清理最终导致内存泄漏。
使用场景
https://zhuanlan.zhihu.com/p/408598288