go version: 1.17
本文从源码层面分析channel是如何创建、发送、接收、关闭的。
找到源码位置
packagemainfuncmain(){ch:=make(chanint)ch<-1<-chselect{casech<-1:default:}select{case<-ch:default:}close(ch)}
查看汇编代码: go tool compile -S -l -N main.go
输出为(省略了不必要的代码):
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)
可以得出:
makechan
: channel的创建
chansend1
: channel的阻塞
发送
chanrecv1
: channel的阻塞
接收
selectnbsend
: channel的无阻塞
发送
selectnbrecv
: channel的无阻塞
接收
closechan
: channel的关闭
channel结构
typehchanstruct{qcountuint//目前循环队列里数据个数dataqsizuint//这个循环队列的大小bufunsafe.Pointer//为指针,指向一个大小固定的数组,用来存放channel的数据elemsizeuint16//channel中元素的大小closeduint32//channel是否关闭,1为关闭elemtype*_type//channel中元素的类型sendxuint//发送数据的索引recvxuint//接收数据的索引recvqwaitq//等待接收的队列,里面放的是goroutnesendqwaitq//等待发送的队列,里面放的是goroutne//lockprotectsallfieldsinhchan,aswellasseveral//fieldsinsudogsblockedonthischannel.////DonotchangeanotherG'sstatuswhileholdingthislock//(inparticular,donotreadyaG),asthiscandeadlock//withstackshrinking.lockmutex//此channel的互斥锁}
sendq
和recvq
的数据结构如下所示:
typewaitqstruct{first*sudoglast*sudog}
创建channel
const(maxAlign=8hchanSize=unsafe.Sizeof(hchan{})+uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)))funcmakechan(t*chantype,sizeint)*hchan{elem:=t.elem(...)//这里判断elem.size*size是否越界。返回值:需要申请的空间大小以及是否越界mem,overflow:=math.MulUintptr(elem.size,uintptr(size))ifoverflow||mem>maxAlloc-hchanSize||size<0{panic(plainError("makechan:sizeoutofrange"))}//HchandoesnotcontainpointersinterestingforGCwhenelementsstoredinbufdonotcontainpointers.//bufpointsintothesameallocation,elemtypeispersistent.//SudoG'sarereferencedfromtheirowningthreadsotheycan'tbecollected.//TODO(dvyukov,rlh):Rethinkwhencollectorcanmoveallocatedobjects.varc*hchanswitch{//如果无缓存,则不需要创建bufcasemem==0://Queueorelementsizeiszero.c=(*hchan)(mallocgc(hchanSize,nil,true))//Racedetectorusesthislocationforsynchronization.c.buf=c.raceaddr()caseelem.ptrdata==0://元素不包含指针,则给hchan和buf分配一块公用的空间,并且buf紧挨着hchanc=(*hchan)(mallocgc(hchanSize+mem,nil,true))c.buf=add(unsafe.Pointer(c),hchanSize)default://元素包含指针,hchan和buf各自单独分配空间c=new(hchan)c.buf=mallocgc(mem,elem,true)}c.elemsize=uint16(elem.size)c.elemtype=elemc.dataqsiz=uint(size)lockInit(&c.lock,lockRankHchan)ifdebugChan{print("makechan:chan=",c,";elemsize=",elem.size,";dataqsiz=",size,"\n")}returnc}
hchanSize
: 意思就是按照maxAlign
倍数对齐,大于等于size(hchan)
,需要申请的最小空间 比如size(hchan)为5字节,maxAlign为8字节,转换成公式为(5 + uintptr( (-5) & (8 - 1)))=8
,只需要申请8字节,用来存放hchan
总结流程
发送数据
阻塞发送
funcchansend1(c*hchan,elemunsafe.Pointer){chansend(c,elem,true,getcallerpc())}
无阻塞发送
funcselectnbsend(c*hchan,elemunsafe.Pointer)(selectedbool){returnchansend(c,elem,false,getcallerpc())}
可以看出来这两个函数都是调用chansend
,只不过block
参数一个为true
,一个为false
而已. 所以直接分析chansend
即可:
funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{ifc==nil{if!block{returnfalse}gopark(nil,nil,waitReasonChanSendNilChan,traceEvGoStop,2)throw("unreachable")}(...)}
逻辑如下 如果是channel空的
对于非阻塞,直接返回false
对于阻塞,会调用gopark挂起,不会返回
funcchansend(c*hchan,epunsafe.Pointer,blockbool,callerpcuintptr)bool{(...)if!block&&c.closed==0&&full(c){returnfalse}(...)}
funcfull(c*hchan)bool{//c.dataqsizisimmutable(neverwrittenafterthechanneliscreated)//soitissafetoreadatanytimeduringchanneloperation.ifc.dataqsiz==0{//Assumesthatapointerreadisrelaxed-atomic.returnc.recvq.first==nil}//Assumesthatauintreadisrelaxed-atomic.returnc.qcount==c.dataqsiz}
逻辑如下 对于非阻塞,并且channel没有关闭
满足以下两种情况会直接返回false
没有缓冲区,并且当前没有接受者
有缓冲区,并且满了
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)0
看下send
函数
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)1
逻辑如下 recvq
中有接收者
则直接把数据拷贝到接收者存数据的地方
唤醒接收者的goroutine. 继续往下看:
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)2
逻辑如下 如果缓冲区没满
则先获得缓冲区中要放数据的位置
将要发送的数据拷贝到缓冲区
更新sendx
channel中元素数量+1
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)3
逻辑如下 缓冲区满了
非阻塞直接返回false
阻塞的新建sudog,并放入sendq
总结流程
接收数据
阻塞接收
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)4
无阻塞接收
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)5
可以看到,最终都是调用chanrecv
,所以直接分析chanrecv
。
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)6
逻辑如下
channel为nil
a. 非阻塞直接返回 b. 阻塞则等待
非阻塞并且 && (channel无缓冲|| channel无数据)
a. channel已关闭,返回false,false b. channel未关闭,返回true,false
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)7
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)8
逻辑如下
再次判断channel已关闭,并且channel中无数据,则直接返回false,false
sendq里有等待的sender a. channel无缓冲区,直接将sender的数据copy到ep b. channel有缓冲区,将缓冲区到数据copy到ep,再将sender的数据copy到缓冲区
唤起sender到goroutine
(...)0x002100033(main.go:5)CALLruntime.makechan(SB)0x002600038(main.go:5)MOVQAX,"".ch+32(SP)0x002b00043(main.go:6)LEAQ""..stmp_0(SB),BX0x003200050(main.go:6)PCDATA$1,$10x003200050(main.go:6)CALLruntime.chansend1(SB)0x003700055(main.go:7)MOVQ"".ch+32(SP),AX0x003c00060(main.go:7)XORLBX,BX0x003e00062(main.go:7)NOP0x004000064(main.go:7)CALLruntime.chanrecv1(SB)(...)0x006200098(main.go:10)CALLruntime.selectnbsend(SB)(...)0x008000128(main.go:15)CALLruntime.selectnbrecv(SB)(...)0x009a00154(main.go:18)CALLruntime.closechan(SB)(...)9
逻辑如下 缓冲区有数据
直接将缓冲区数据copy到ep
清空当前缓冲区刚刚读数据的地方
更新c.recvx
c.qcount--
typehchanstruct{qcountuint//目前循环队列里数据个数dataqsizuint//这个循环队列的大小bufunsafe.Pointer//为指针,指向一个大小固定的数组,用来存放channel的数据elemsizeuint16//channel中元素的大小closeduint32//channel是否关闭,1为关闭elemtype*_type//channel中元素的类型sendxuint//发送数据的索引recvxuint//接收数据的索引recvqwaitq//等待接收的队列,里面放的是goroutnesendqwaitq//等待发送的队列,里面放的是goroutne//lockprotectsallfieldsinhchan,aswellasseveral//fieldsinsudogsblockedonthischannel.////DonotchangeanotherG'sstatuswhileholdingthislock//(inparticular,donotreadyaG),asthiscandeadlock//withstackshrinking.lockmutex//此channel的互斥锁}0
逻辑如下
缓冲区满了,无阻塞channel直接返回
阻塞channel的话 a. 新建sudog b. 将sudog放入recvq c. 调用gopark,等待
总结流程
关闭channel
typehchanstruct{qcountuint//目前循环队列里数据个数dataqsizuint//这个循环队列的大小bufunsafe.Pointer//为指针,指向一个大小固定的数组,用来存放channel的数据elemsizeuint16//channel中元素的大小closeduint32//channel是否关闭,1为关闭elemtype*_type//channel中元素的类型sendxuint//发送数据的索引recvxuint//接收数据的索引recvqwaitq//等待接收的队列,里面放的是goroutnesendqwaitq//等待发送的队列,里面放的是goroutne//lockprotectsallfieldsinhchan,aswellasseveral//fieldsinsudogsblockedonthischannel.////DonotchangeanotherG'sstatuswhileholdingthislock//(inparticular,donotreadyaG),asthiscandeadlock//withstackshrinking.lockmutex//此channel的互斥锁}1