Go-Channel的使用和底层原理(下)
 自在人生  分类:IT技术  人气:64  回帖:0  发布于1年前 收藏

前言

上篇我们讲了channel的基础操作和hchan的结构,这篇我们就channel的核心发送和接收流程的底层实现原理来展开了解,代码会很多,但是都是方便我们去理解里面流程,会对我们对理解原理有大的提升。

1:Channel发送流程

向 channel 中发送数据时,编译器在编译它时,实际调用的是src/runtime/chan.go中的chansend函数。

//  ch <- 1 发送代码
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 ...
}

1.1:阻塞式发送

阻塞式发送在调用chansend函数,block=true,代码方式如下:

ch <- 10

// 编译后实际调用函数
func chansend1(c *hchan, elem unsafe.Pointer) {
 chansend(c, elem, true, getcallerpc())
}

1.2:非阻塞式发送

阻塞式发送在调用chansend函数,block=true,通过select在其阻塞时直接返回,代码方式如下:

select {
    case ch <- 10:
    ...
  default
}

//select 编译后实际调用函数
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
 return chansend(c, elem, false, getcallerpc())
}

1.3:发送逻辑

通过上面我们知道channel发送的时候分为阻塞式和非阻塞式,但是最终都是调用的chansend函数,那么就一起理一理chansend的主要逻辑

1:在chan为nil 未初始化的情况下,对于select这种非阻塞的发送,直接返回 false;对于阻塞的发送,将 goroutine 挂起

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
 if c == nil {
  if !block {
   return false
  }
  gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }
 ...
}

2:非阻塞发送时,channel 不为 nil,channel 没有关闭时,并且缓冲区已满,返回false。full()函数是用来判断channel是否会发生阻塞(也就是channel容量是否满了)

 if !block && c.closed == 0 && full(c) {
  return false
 }

 func full(c *hchan) bool {
  // 如果循环队列大小为0
  if c.dataqsiz == 0 {
   // c.datasiz  == 0 没有接收者
   return c.recvq.first == nil
  }
  // 队列满了
  return c.qcount == c.dataqsiz
 }

3:下面就开始真正数据的发送流程了当然也分几种情况,首先会对channel加锁,判断channel是否关闭状态(对已关闭的channel发送数据,会panic)。然后从接收等待队列中获取一个接受者sudog,且接受者存在,那么绕过缓冲buf,直接向接受者发送数据,此时的buf一定是空的。向接收者sg发送数据的时,会唤醒等待接收的goroutine。也就是调用goready() 将等待接收的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable。具体的代码我们可以一步步追踪下去

{
  ...
 // 对channel加锁, mutex
 lock(&c.lock)
 // 判断channel是否关闭
 if c.closed != 0 {
  //释放锁
  unlock(&c.lock)
  // paninc
  panic(plainError("send on closed channel"))
 }
 // 从接收队列中获取一个接收者,且接受者不为空
 if sg := c.recvq.dequeue(); sg != nil {
  // 接收者存在,直接向该接收者发送数据,绕过channel缓冲
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
 }
}

4:缓冲区未满,缓冲区未满的情况是c.qcount < c.datasiz (缓冲区队列数量小于容量),这种发送模式类似异步,需要同时也有接收者等待接收。chanbuf是获取channel的sendx索引元素的指针值 unsafe.Pointer,然后调用typedmemmove()函数进行转换

 // 如果缓冲区未满
 if c.qcount < c.dataqsiz {
  // 找到要发送数据到循环队列buf的索引位置
  qp := chanbuf(c, c.sendx)
  ......
        // 数据拷贝到循环队列中
  typedmemmove(c.elemtype, qp, ep)
        // 将待发送数据索引加1
  c.sendx++
  if c.sendx == c.dataqsiz {
   //因为循环队列,如果到了末尾,从0开始
   c.sendx = 0
  }
        // chan中队列元素个数加1
  c.qcount++
  // 释放锁
  unlock(&c.lock)
  return true
 }

5:缓冲区已满,对于select这种非阻塞发送会直接返回false,并且recvq没有接收者,此时将会将goroutine挂起,放到sendq中,等待被唤醒。当goroutine唤醒以后,解除阻塞的状态

  // 缓冲区已满,对于select这种非阻塞调用直接返回false
    if !block {
  unlock(&c.lock)
  return false
 }

 // 下面的逻辑是将当前goroutine挂起
    // 调用 getg()方法获取当前goroutine的指针,用于绑定给一个 sudog
 gp := getg()
    // 调用 acquireSudog()方法获取一个 sudog,可能是新建的 sudog,也有可能是从缓存中获取的。设置好sudog要发送的数据和状态。比如发送的 	Channel、是否在 select 中和待发送数据的内存地址等等。
 mysg := acquireSudog()
 mysg.releasetime = 0
 if t0 != 0 {
  mysg.releasetime = -1
 }
 
 mysg.elem = ep
 mysg.waitlink = nil
 mysg.g = gp
 mysg.isSelect = false
 mysg.c = c
 gp.waiting = mysg
 gp.param = nil
    // 调用 c.sendq.enqueue 方法将配置好的 sudog 加入待发送的等待队列
 c.sendq.enqueue(mysg)
 atomic.Store8(&gp.parkingOnChan, 1)
    // 调用gopark方法挂起当前goroutine,状态为waitReasonChanSend,阻塞等待channel接收者的激活
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
 // 最后,KeepAlive() 确保发送的值保持活动状态,直到接收者将其复制出来
 KeepAlive(ep)

是不是感觉整个流程还挺多的,那么结合流程图将加深我们的理解

发送流程

2:Channel接收流程

从channel 中接收数据时,编译器在编译它时,实际调用的是src/runtime/chan.go中的chanrecv函数。selected, received都是 bool类型, selected表示是否能接收到值

//  i <- ch
// i, ok <- ch
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 ...
}

1:跟发送的时候一样,在chan为nil 未初始化的情况下,对于select这种非阻塞的发送,直接返回 false;对于阻塞的发送,将 goroutine 挂起

if c == nil {
 if !block {
   return
  }
  gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
  throw("unreachable")
 }

2:非阻塞接收,通过empty()判断是无缓冲chan或者是chan中没有数据

if !block && empty(c) {
 ...
}

3:接下来就是阻塞式接收,对chan加锁,判断chan如果已经关闭,并且chan中没有数据,返回 (true,false),这里的第一个true表示chan关闭后读取的 0 值

// 已关闭, 并且channel中没有数据, 此时返回零值
if c.closed != 0 && c.qcount == 0 {
  if raceenabled {
   raceacquire(c.raceaddr())
  }
  unlock(&c.lock)
  if ep != nil {
   typedmemclr(c.elemtype, ep)
  }
  return true, false
 }

4:发送队列不为空,从发送等待队列中获取一个发送者sudog,当发送者存在不为nil,会唤醒等待发送的goroutine。也就是调用goready() 将等待发送的阻塞 goroutine 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable。此时的缓冲区一定是满的,因为有等待的发送者。对应缓冲区满的情况,从队列的头部接收数据,发送者的值添加到队列的末尾

if sg := c.sendq.dequeue(); sg != nil {
 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
 return true, true
}

5:缓冲区有数据,直接从缓冲区接收数据,然后对recvx的下标进行逻辑处理,这里跟发送的时候很像

if c.qcount > 0 {
 qp := chanbuf(c, c.recvx)
 if raceenabled {
  raceacquire(qp)
  racerelease(qp)
 }
 ...
}

6:缓冲区没有数据,如果是select这种非阻塞读取的情况,直接返回(false, false),表示获取不到数据;否则,会获取sudog绑定当前接收者goroutine,调用gopark()挂起当前接收者goroutine,等待chan的其他发送者唤醒

  // 如果是select非阻塞读取的情况,直接返回(false, false)
 if !block {
  unlock(&c.lock)
  return false, false
 }
  // 获取当前 goroutine 的指针,用于绑定给一个 sudog
 gp := getg()
 mysg := acquireSudog()
 ...
 //挂起goroutine
 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
接收流程

3:Close关闭Channel

channel的关闭close(ch)底层是调用closechan()函数,整个回收流程分为两步:1:判断是否可以关闭, 将channel关闭 2:获取代接收、发送goroutine并唤醒到ready状态

func closechan(c *hchan) {
 ...
}

1:判断channel是否为nil,如果为nil会panic,然后判断channel是否已经被关闭了,我们知道关闭已经关闭的channel也是会panic的,这两个关于channel的知识点其实就在这里

 // 如果c为nil,关闭它会panic
 if c == nil {
  panic(plainError("close of nil channel"))
 }

 lock(&c.lock)
 // 关闭已经关闭的chan会panic
 if c.closed != 0 {
  unlock(&c.lock)
  panic(plainError("close of closed channel"))
 }

2:关闭channel,将channel的closed的值设置为1,获取所有的recvq的接收者,sendq的写入者到glist,然后将glist中所有的goroutine都唤醒,关闭channel的流程结束

c.closed = 1
// 申明一个存放所有接收者和发送者goroutine的list
var glist gList

//获取recvq的接受者
for {
 sg := c.recvq.dequeue()
 if sg == nil {
  break
 }
 ...
}

//获取sendq的发送者
for {
 sg := c.sendq.dequeue()
 if sg == nil {
  break
 }
 ...
}

// 唤醒glist的goroutine到Grunnable状态
for !glist.empty() {
 gp := glist.pop()
 gp.schedlink = 0
 goready(gp, 3)
}
关闭流程

总结

Channel是在用有锁队列实现数据在不同协程之间传输数据,数据传输的方式其实就是值传递,使用时需要注意以下口诀

  1. 给一个nil channel发送数据,造成永远阻塞
  2. 从一个nil channel接收数据,造成永远阻塞
  3. 给一个已经关闭的channel发送数据,引起panic
  4. 从一个已经关闭的channel接收数据,如果缓冲区中为空,则返回一个零值
  5. 无缓冲的channel是同步的,而有缓冲的channel是非同步的

channel操作

chan为nil

关闭的chan

非空、未关闭的chan

读 <- chan

阻塞

里面的内容读完了,之后获取到的是类型的零值

阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞

写 chan <-

阻塞

panic

阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞

关闭 close(chan)

panic

panic

关闭成功

关于channel的使用和底层原理就总结完了,希望对大家对加深channel的理解提供实质性帮助!

 标签: 暂无标签

讨论这个帖子(0)垃圾回帖将一律封号处理……