前言
CSP
(communicating sequential processes) 指互相独立的并发实体之间通过共享的通讯管道(如channel)进行通信的并发模型,不同语言有不同的并发模型。
Java、C++ 的并发模型都是通过共享内存
实现的,非常典型的方式就是,在访问共享数据(如数组、Map等)的时候,对共享内存加锁
,因此,衍生出了许多线程安全的数据结构
。
Golang 借鉴CSP模型的一些概念作为并发模型的理论支持。大家最常听见的那句话
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现内存共享。 即是由此而来
在Golang中,goroutine
作为独立的并发实体,channel
作为不同实体间数据通信的管道。
本文主要介绍 Golang 中的channel
CSP模型的优点
CSP 模型中,基于管道 通信,相对于 对共享内存加锁 属于一种程序设计上的抽象与封装
基于管道 通信,类比于生产者-消费者模型,属于一种逻辑上的解耦,相似的还有Java中线程池结构
类型
channel 主要分为 有缓冲
和无缓冲
的两种 channelch := make(chan int , 1)8 两种channel最大的区别是 ,有缓冲的channel是非阻塞
模型,无缓冲的channel是阻塞
模型
有缓冲的channel
ch := make(chan int , 1)
无缓冲的channel
ch := make(chan int )
make指定len为0时,也是一个无缓冲的channel
ch := make(chan int , 0)
只读channel和只写channel
表示channel只能被读或只能被写,通常是对channel的使用作限制
func onlyReadAndWrite(){ ch := make(chan int ,1) onlyRead(ch) onlyWrite(ch)}// 参数为只读channelfunc onlyRead(ch <-chan int ){ <- ch }// 参数为只写channelfunc onlyWrite(ch chan<- int ){ ch <- 1}
零值为nil的channel
channel的零值可以为nil 。对这样的channel发送或接收会永远阻塞。
在select语句中操作nil的channel永远都不会被select到,我们可以用这个特性来激活或者禁用case
var verbose = flag.Bool("v", false, "show verbose progress messages")
func main() { // ...start background goroutine...
// Print the results periodically.var tick <-chan time.Timeif *verbose { tick = time.Tick(500 * time.Millisecond)}var nfiles, nbytes int64
loop: for { select { case size, ok := <-fileSizes: if !ok { break loop // fileSizes was closed } nfiles++ nbytes += size case <-tick: printDiskUsage(nfiles, nbytes) } } printDiskUsage(nfiles, nbytes) // final totals }
如果程序启动时,`-v`没有传入,则**tick** 这个channel会保持为nil,select中的case永远不会被执行# 数据结构channel运行时数据结构存放在`runtime.hchan`下,
type hchan struct { qcount uint // channel中元素个数 dataqsiz uint // channel循环队列的长度 ,make channel中的len属性 ,即缓冲区大小 buf unsafe.Pointer // channel缓冲区数据指针; elemsize uint16 // channel元素的大小,是 elem元数据类型的大小 closed uint32 elemtype *_type // sendx uint // channel的send操作处理到的位置; recvx uint // channel的recv操作处理到的位 recvq waitq // recv 等待队列(即 <- channel ) sendq waitq // send 等待队列(即 channel <- )
lock mutex }
可以看到,channel底层依然是使用了mutex互斥锁来做并发控制。 [有关Mutex可以看这篇文章](https://juejin.cn/post/7095334586019741704)看看`waitq`的结构
type waitq struct { first sudog last sudog }
是一个`sudog`结构的双向链表 ,再看看`sudog`结构
type sudog struct { g *g // 指向goroutine结构题
next sudog // 前sudog prev sudog // 后sudog
}
ch := make(chan int , 1)0
func makechan(t chantype, size int) hchan { elem := t.elem
ch := make(chan int , 1)1
if elem.size >= 1<<16 { throw("makechan: invalid channel element type") }
// 内存对齐相关 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 计算需要分配的内存大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// 检查内存大小是否超过系统限制 && 是否堆内存溢出 if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
// 当buf不包含指针类型时,那么会为channel和底层数组分配一段连续的内存空间 // sodog会从其拥有的线程中引用该对象,因此该对象无法被gc收集(不会被gc回收)
var c hchan switch { case mem == 0: // 无缓冲区channel c = (hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // buf中元素不包含指针 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // buf中元素包含指针,为hchan和buf分配内存 c = new(hchan) c.buf = mallocgc(mem, elem, true) }
c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c }
ch := make(chan int , 1)2
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock()
// 1. 如果recvq队列中有等待者,直接进入send方法 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
ch := make(chan int , 1)3
if c.qcount < c.dataqsiz { // 计算下一个可以存储数据的位置 qp := chanbuf(c, c.sendx) // 参数 ep 放入上一步计算的 qp 对应的位置上 typedmemmove(c.elemtype, qp, ep) // 更新send index && qcount c.sendx++ // 环形队列 if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
if !block { unlock(&c.lock) return false }
// 3. 阻塞channel, 直到新的接收者从channel中读数据 // 获得当前运行的goroutine指针 gp := getg() // 分配sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 }
// dosomething
// 当前sudog入发送队列 c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1) // gopark ,goroutine变为 gwaiting 状态 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 为确保往channel里发送的数据不被gc回收,sodog一直引用该对象 KeepAlive(ep)
// dosomething
// 释放sudog releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }
ch := make(chan int , 1)4
func send(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
ch := make(chan int , 1)5
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1. 当我们从空channel读数据,会调用gopark让出当前处理器占用 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } lock(&c.lock)
// 2. 当前channel已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回。 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
// 3. 如果发送队列中有goroutine被阻塞, if sg := c.sendq.dequeue(); sg != nil { // 调用recv方法 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
// 4. 如果channel缓冲区中有数据,直接从缓冲区中读数据 if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 更新 recv索引 和 环形队列长度 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
if !block { unlock(&c.lock) return false, false } // 5. 没有阻塞的发送者 && channel缓冲区为空 ,阻塞当前goroutine gp := getg() mysg := acquireSudog()
// dosomething
// 将当前sudog压入channel的接收队列 c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// gopark 让出处理器使用权 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }
ch := make(chan int , 1)6
// func recv(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { // 无缓冲的channel if ep != nil { // 将channel发送队列中goroutine存储的数据拷贝到目标内存地址中 recvDirect(c.elemtype, sg, ep) } } else { // 有缓冲的channel // ;将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方 qp := chanbuf(c, c.recvx) if ep != nil { // 将队列中的数据拷贝到接收方的内存地址 typedmemmove(c.elemtype, ep, qp) } // 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 当前处理器的 runnext 设置成发送数据的goroutine,在调度器下一次调度时将阻塞的发送方唤醒 goready(gp, skip+1) }
ch := make(chan int , 1)7
data, ok := <-ch // 阻塞的
ch := make(chan int , 1)8
func forRange() { ch := make(chan int, 1) go read(ch) go write(ch)
time.Sleep(time.Second) log.Println("休眠1s")
go write(ch) time.Sleep(time.Minute) }
func write(ch chan int) { for i := 0; i < 1; i++ { ch <- i log.Printf("send: [%d]", i) break } } func read(ch chan int) { for { data, ok := <-ch // 阻塞的 if ok { log.Printf("recv: [%d]", data) } else { log.Println("channel close ") break } } }
ch := make(chan int , 1)9
func readV2(ch chan int) { for { select{ case data , ok := <- ch: if ok { log.Printf("recv:[%v]",data) }else{ log.Printf("ok-false") } default: log.Println("into-default") } } }
ch := make(chan int )0
func write(ch chan int, times int) { for i := 0; i < times; i++ { ch <- i log.Printf("send: [%d]", i) break } } func read(ch chan int) { for { data, ok := <-ch // 阻塞的 if ok { log.Printf("recv: [%d]", data) } else { log.Println("channel close ") break } } }
ch := make(chan int )1原文:https://juejin.cn/post/7097617352740569102