首页>>后端>>Golang->Golang

Golang

时间:2023-12-01 本站 点击:0

前言

CSP (communicating sequential processes) 指互相独立的并发实体之间通过共享的通讯管道(如channel)进行通信的并发模型,不同语言有不同的并发模型。

Java、C++ 的并发模型都是通过共享内存实现的,非常典型的方式就是,在访问共享数据(如数组、Map等)的时候,对共享内存加锁,因此,衍生出了许多线程安全的数据结构

Golang 借鉴CSP模型的一些概念作为并发模型的理论支持。大家最常听见的那句话

Do not communicate by sharing memory; instead, share memory by communicating.

不要通过共享内存来通信,而要通过通信来实现内存共享。 即是由此而来

在Golang中,goroutine作为独立的并发实体,channel作为不同实体间数据通信的管道。

本文主要介绍 Golang 中的channel

CSP模型的优点

CSP 模型中,基于管道 通信,相对于 对共享内存加锁 属于一种程序设计上的抽象与封装

基于管道 通信,类比于生产者-消费者模型,属于一种逻辑上的解耦,相似的还有Java中线程池结构

类型

channel 主要分为 有缓冲无缓冲的两种 channelch := make(chan int , 1)8 两种channel最大的区别是 ,有缓冲的channel是非阻塞模型,无缓冲的channel是阻塞模型

有缓冲的channel

ch := make(chan int , 1)

无缓冲的channel

ch := make(chan int )

make指定len为0时,也是一个无缓冲的channel

ch := make(chan int , 0)

只读channel和只写channel

表示channel只能被读或只能被写,通常是对channel的使用作限制

func onlyReadAndWrite(){   ch := make(chan int ,1)   onlyRead(ch)   onlyWrite(ch)}// 参数为只读channelfunc onlyRead(ch <-chan int ){   <- ch }// 参数为只写channelfunc onlyWrite(ch chan<- int ){    ch <- 1}

零值为nil的channel

channel的零值可以为nil 。对这样的channel发送或接收会永远阻塞。

在select语句中操作nil的channel永远都不会被select到,我们可以用这个特性来激活或者禁用case

var verbose = flag.Bool("v", false, "show verbose progress messages")

func main() { // ...start background goroutine...

// Print the results periodically.var tick <-chan time.Timeif *verbose {    tick = time.Tick(500 * time.Millisecond)}var nfiles, nbytes int64

loop: for { select { case size, ok := <-fileSizes: if !ok { break loop // fileSizes was closed } nfiles++ nbytes += size case <-tick: printDiskUsage(nfiles, nbytes) } } printDiskUsage(nfiles, nbytes) // final totals }

如果程序启动时,`-v`没有传入,则**tick** 这个channel会保持为nil,select中的case永远不会被执行# 数据结构channel运行时数据结构存放在`runtime.hchan`下,

type hchan struct { qcount   uint           // channel中元素个数 dataqsiz uint           // channel循环队列的长度 ,make channel中的len属性 ,即缓冲区大小 buf      unsafe.Pointer // channel缓冲区数据指针; elemsize uint16         //  channel元素的大小,是 elem元数据类型的大小 closed   uint32 elemtype *_type //  sendx    uint   // channel的send操作处理到的位置; recvx    uint   // channel的recv操作处理到的位 recvq    waitq  // recv 等待队列(即 <- channel ) sendq    waitq  // send 等待队列(即 channel <- )

lock mutex }

可以看到,channel底层依然是使用了mutex互斥锁来做并发控制。  [有关Mutex可以看这篇文章](https://juejin.cn/post/7095334586019741704)看看`waitq`的结构

type waitq struct { first sudog last  sudog }

是一个`sudog`结构的双向链表 ,再看看`sudog`结构

type sudog struct { g *g // 指向goroutine结构题

next sudog // 前sudog prev sudog // 后sudog

}

ch := make(chan int , 1)0

func makechan(t chantype, size int) hchan { elem := t.elem

ch := make(chan int , 1)1

if elem.size >= 1<<16 { throw("makechan: invalid channel element type") }

// 内存对齐相关 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 计算需要分配的内存大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size))

// 检查内存大小是否超过系统限制 && 是否堆内存溢出  if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }

// 当buf不包含指针类型时,那么会为channel和底层数组分配一段连续的内存空间 // sodog会从其拥有的线程中引用该对象,因此该对象无法被gc收集(不会被gc回收)

var c hchan switch { case mem == 0: // 无缓冲区channel c = (hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.ptrdata == 0: // buf中元素不包含指针 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // buf中元素包含指针,为hchan和buf分配内存 c = new(hchan) c.buf = mallocgc(mem, elem, true) }

c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) return c }

ch := make(chan int , 1)2

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

lock()

// 1. 如果recvq队列中有等待者,直接进入send方法 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }

ch := make(chan int , 1)3

if c.qcount < c.dataqsiz { // 计算下一个可以存储数据的位置 qp := chanbuf(c, c.sendx) // 参数 ep 放入上一步计算的 qp 对应的位置上 typedmemmove(c.elemtype, qp, ep) // 更新send index && qcount  c.sendx++ // 环形队列 if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }

if !block { unlock(&c.lock) return false }

// 3. 阻塞channel, 直到新的接收者从channel中读数据 // 获得当前运行的goroutine指针 gp := getg() // 分配sudog mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 }

// dosomething

// 当前sudog入发送队列 c.sendq.enqueue(mysg)

atomic.Store8(&gp.parkingOnChan, 1) // gopark ,goroutine变为 gwaiting 状态 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

// 为确保往channel里发送的数据不被gc回收,sodog一直引用该对象 KeepAlive(ep)

// dosomething

// 释放sudog releaseSudog(mysg) if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }

ch := make(chan int , 1)4

func send(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) { if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }

ch := make(chan int , 1)5

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

// 1. 当我们从空channel读数据,会调用gopark让出当前处理器占用 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }

if !block && empty(c) { if atomic.Load(&c.closed) == 0 { return } if empty(c) { if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } } lock(&c.lock)

// 2. 当前channel已经被关闭并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回。 if c.closed != 0 && c.qcount == 0 { unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }

// 3. 如果发送队列中有goroutine被阻塞, if sg := c.sendq.dequeue(); sg != nil { // 调用recv方法 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }

// 4. 如果channel缓冲区中有数据,直接从缓冲区中读数据 if c.qcount > 0 { qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 更新 recv索引 和 环形队列长度 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }

if !block { unlock(&c.lock) return false, false } // 5. 没有阻塞的发送者 && channel缓冲区为空 ,阻塞当前goroutine gp := getg() mysg := acquireSudog()

// dosomething

// 将当前sudog压入channel的接收队列 c.recvq.enqueue(mysg)

atomic.Store8(&gp.parkingOnChan, 1)

// gopark 让出处理器使用权 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }

ch := make(chan int , 1)6

//  func recv(c hchan, sg sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { // 无缓冲的channel if ep != nil { // 将channel发送队列中goroutine存储的数据拷贝到目标内存地址中 recvDirect(c.elemtype, sg, ep) } } else { // 有缓冲的channel // ;将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方 qp := chanbuf(c, c.recvx) if ep != nil { // 将队列中的数据拷贝到接收方的内存地址 typedmemmove(c.elemtype, ep, qp) } // 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 当前处理器的 runnext 设置成发送数据的goroutine,在调度器下一次调度时将阻塞的发送方唤醒 goready(gp, skip+1) }

ch := make(chan int , 1)7

data, ok := <-ch // 阻塞的

ch := make(chan int , 1)8

func forRange() { ch := make(chan int, 1) go read(ch) go write(ch)

time.Sleep(time.Second) log.Println("休眠1s")

go write(ch) time.Sleep(time.Minute) }

func write(ch chan int) { for i := 0; i < 1; i++ { ch <- i log.Printf("send: [%d]", i) break } } func read(ch chan int) { for { data, ok := <-ch // 阻塞的 if ok { log.Printf("recv: [%d]", data) } else { log.Println("channel close ") break } } }

ch := make(chan int , 1)9

func readV2(ch chan int) { for { select{ case data , ok := <- ch: if ok { log.Printf("recv:[%v]",data) }else{ log.Printf("ok-false") } default: log.Println("into-default") } } }

ch := make(chan int )0

func write(ch chan int, times int) { for i := 0; i < times; i++ { ch <- i log.Printf("send: [%d]", i) break } } func read(ch chan int) { for { data, ok := <-ch // 阻塞的 if ok { log.Printf("recv: [%d]", data) } else { log.Println("channel close ") break } } }

ch := make(chan int )1
原文:https://juejin.cn/post/7097617352740569102


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/Golang/5921.html