Advertisement

Go 源码之 Chan

阅读量:

Go 源码之 chan

目录

  • Go 源码之 chan
    • 一、简介

    • 二、源码

      • (一)hchan
      • (二)创建
      • (三)发送
      • (四)接收
      • (五)关闭
    • 三、常见问题

      • 1.为什么要使用环形队列
      • 2. 关于chan的操作
    • 总结

    • 参考资料

一、简介

在Go语言源码库中实现的数据传输与状态协调功能的核心模块是通道(chan)这一抽象数据类型。

chan的主要特点包括:

  1. 该系统支持发送与接收操作。
    2. 该系统能够处理多种数据格式。
    3. 该系统提供阻塞与非阻塞两种操作模式。

chan在 Go 语言中的实现涉及到以下几个方面:

  1. 内存管理:负责内存需求下的通道分配与管理。
  2. 并行控制:在多协程环境下负责保障安全访问。
  3. 数据传输:负责实现数据发送与接收的完整机制。

使用 chan 时需要注意以下几点:

规范地说明不同类型的通信渠道的应用,并确保其正确性

chan 在 Go 语言中起到关键作用,并非偶然现象;它负责协调不同协程之间的通信与同步,并通过一种简洁而高效的机制来确保不同协程之间能够顺畅地进行通信与同步。

二、源码

/src/runtime/chan.go

  • 一个环形队列
  • 两个双向列表
image-20230323100951975.png

(一)hchan

buf + sendx + recvx 形成环形队列

复制代码
    type hchan struct {
    	qcount   uint           // 队列中现存元素数量
    	dataqsiz uint           // 队列容量(缓冲区)
    	buf      unsafe.Pointer // 队列,指向一个动态分配的数组,用于存储 channel 中的元素
    	elemsize uint16         // 队列中元素大小
    	closed   uint32         // 0 正常	,1 关闭
    	elemtype *_type     		// 队列中元素类型,
      
    	sendx    uint   				// 队列(buf)已发送位置,当(sendx++)==dataqsiz,则从头开始发,sendx=0
    	recvx    uint   				// 队列(buf)已接收位置;
      												// 当 `sendx` 和 `recvx` 相等时,channel 中无元素,发送 / 接收 操作阻塞
      
    	recvq    waitq  				// 双向链表 ,FIFO 由 recv 行为(也就是读 <-ch)阻塞在 channel 上的 goroutine 队列
    	sendq    waitq 					// 双向链表 ,FIFO 由 send 行为 (也就是写 ch<-) 阻塞在 channel 上的 goroutine 队列
    
    	
    	lock mutex 							// 读写锁,保护hchan中的所有字段,以及waitq中所有的字段
    }
    
    // 双向链表,存储了g
    type waitq struct {
    	first *sudog  // 链表头部,协程 g 的数据结构
    	last  *sudog  // 链表尾部,协程 g 的数据结构
    }

(二)创建

ch1 := make(chan int)
ch2 := make(chan int,2)

底层都是调用了runtime.makechan()

  • 合法性校验

    • 数据类型大小校验
    • 内存溢出校验
  • 初始化为hchan

  • 初始化为无缓冲的hchan

  • 初始化为同时满足有缓冲状态和无指针元素的hchan

  • 初始化为同时满足无缓冲状态和有指针元素的hchan

  • 将其他相关参数设置为hchan,并将数据大小(dataqsize)、内存大小(elemsize)、数据类型(elemtype)以及锁机制(lock)设为默认值。

复制代码
    // 主要逻辑:合法性验证 和 分配地址空间
    // t 是指向 chantype 的指针,size 表示缓冲区大小,0表无缓冲
    func makechan(t *chantype, size int) *hchan {
    	elem := t.elem // 元素的类型
    
      // ----------- 1. 合法性验证 ----------
    	// 数据类型大小验证,大于1<<16时异常
    	if elem.size >= 1<<16 {
    		throw("makechan: invalid channel element type")
    	}
      // 内存对齐(降低寻址次数),大于最大内存(8字节数)时异常
    	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    		throw("makechan: bad alignment")
    	}
    
      // 传入的size大于堆可分配的最大内存时:内存溢出异常
    	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    	if overflow || mem > maxAlloc-hchanSize || size < 0 {
    		panic(plainError("makechan: size out of range"))
    	}
    
      // ----------- 2. 分配地址空间 ----------
      // hchanSize 为 hchan 结构大小
      // mem 为缓存区大小
      /* 根据 channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan 和 缓冲区,分为三种情况:
      		* 如果不存在缓冲区,分配 hchan 结构体空间,即无缓存 channel
      		* 如果 channel 存储的类型不是指针类型,分配连续地址空间,包括 hchan 结构体 + 数据
      		* 默认情况包括指针,为 hchan 和 buf 单独分配数据地址空间
      	更新 hchan 结构体的数据,包括 elemsize、elemtype 和 dataqsiz
    	*/
    	var c *hchan
    	switch {
    case mem == 0:
    	 // 创建无缓冲的 chan ,buf==0 ,初始化 hchan
       c = (*hchan)(mallocgc(hchanSize, nil, true)) // hchanSize表示空hchan需要占用的字节
       c.buf = c.raceaddr()  //  raceaddr内部实现为:return unsafe.Pointer(&c.buf)
    case elem.ptrdata == 0:
       // 有缓存区,并且队列中不存在指针,分配连续地址空间,大小为 hchanSize + mem
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       // buf指针指向空hchan占用空间的末尾
       c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
       // 队列包含指针类型
       // 为buf单独开辟mem大小的空间,用来保存所有的数据
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
    }
    
    	c.elemsize = uint16(elem.size)  			// 元素大小
    	c.elemtype = elem 										// 元素类型
    	c.dataqsiz = uint(size) 							// chan 缓存区大小
    	lockInit(&c.lock, lockRankHchan) 			// 初始化锁
    
    	if debugChan {
    		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    	}
    	return c
    }

(三)发送

ch <- 1

执行 runtime.chansend1(SB)

  • 异常检测

  • 发送给 nil 通道时会执行阻塞操作

  • 往已关闭的 channel 发送(操作)将导致 panic 错误

  • 该 channel 是否允许发送操作

  • 同步发送:当 recvq 中存在等待接收者的状态时,则立即唤醒相关处理单元并开始传输数据。

  • 异步发送:若缓存区剩余空间足够大(即 c.qcount < c.dataqsiz),则将数据块存储至缓存区中以备后续处理。

  • 阻塞发送:在所有前置条件均不满足的情况下(即 block = true 且前面的条件都不成立),执行以下操作:线程被阻塞执行任务并暂时暂停处理;随后将该操作加入 sendq 队列以便后续处理;等待队列中的任务仅在接收到相应客户端的消息后才会终止。

复制代码
    /** * @Description: 
     chansend函数主要可以归纳为四部分:
    	 异常检查、同步发送、异步发送、阻塞发送:
    * @Param:c:hchan结构;ep:发送的元素;block:是否阻塞;callerpc:
    * @return: true发送成功,false发送失败
    */
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      // ------------------------------------ 1.异常检查 ------------------------------------
    	if c == nil { 
    		if !block {
    			return false
    		}
    		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) 	// 发送到 nil chan 中,阻塞挂起
    		throw("unreachable")
    	}
    	..........
    
     																							 // 当channel不为nil,此时检查channel是否做好接收发送操作的准备,
      if !block && c.closed == 0 && full(c) { 	
    		return false															// 非阻塞且未关闭: 1. 无缓存区,recvq为空 2. 有缓冲区,但是buffer已满
    	}
    
    
    	lock(&c.lock) // 先上锁
    
    	if c.closed != 0 { // chan已经关闭,则解锁
    		unlock(&c.lock)
    		panic(plainError("send on closed channel")) 				// 往 closed chan 发送(写),则 panic
    	}
    
      // ------------------------------------  2.同步发送 ------------------------------------
      																										// recvq 中存在等待接收者,则直接唤醒并发送数据
    	if sg := c.recvq.dequeue(); sg != nil { 
    		send(c, sg, ep, func() { unlock(&c.lock) }, 3)		// recvq 等待队列取取出 sg(sudog)并唤醒并发送数据 ep
    		return true
    	}
       
      // ------------------------------------  3.异步发送 ------------------------------------
      // (有缓存区,没有等待接收者,先发到缓冲区中,等有接收者再去读)
    	if c.qcount < c.dataqsiz {  					// 存在的元素个数< 缓冲区:说明缓存区可以继续写数据
    		qp := chanbuf(c, c.sendx) 					// 获取缓存区index地址
    		typedmemmove(c.elemtype, qp, ep)		// 数据写入buffer
    		c.sendx++ 													// 发送数据的下标++
    		if c.sendx == c.dataqsiz { 					// 当发送数据的下标等于缓冲区,表数据发送完毕,从头开始
    			c.sendx = 0
    		}
    		c.qcount++ 													// 元素数量++
    		unlock(&c.lock) 										// 解锁
    		return true 												// 返回结果
    	}
    
    	if !block {
    		unlock(&c.lock) 										// 解锁
    		return false
    	}
    
      // ------------------------------------ 4. 阻塞发送 ------------------------------------
      // 当前面都不满足时(没有等待接收者,没有空闲缓冲区) 且 block = true 时,发送操作 线程阻塞 挂起,直到有接收者接收才释放:
    	gp := getg()
    	..........
      c.sendq.enqueue(mysg)  											// 将发送 的 sg 添加到 sendq 等待队列中
    	return true
    }
复制代码
    func full(c *hchan) bool {
    	if c.dataqsiz == 0 { // 无缓冲
    		return c.recvq.first == nil
    	}
    	// 有缓冲,现有元素的个数 是否等于 缓冲区容量时(缓冲区满)
    	return c.qcount == c.dataqsiz
    }
复制代码
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
       ......
       if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep) //  将数据拷贝到接收变量的内存地址上
      sg.elem = nil
       }
       gp := sg.g
       unlockf()
       gp.param = unsafe.Pointer(sg)
       sg.success = true
       if sg.releasetime != 0 {
      sg.releasetime = cputicks()
       }
       goready(gp, skip+1) // 唤醒sudog协程;下一轮调度时会唤醒这个接收的 goroutine。
    }
复制代码
    // 实现了等待队列的入队操作。它将一个元素添加到等待队列的末尾,并更新队列的 first 和 last 指针
    func (q *waitq) enqueue(sgp *sudog) {
    	sgp.next = nil // 表示该元素是队列的最后一个
    	x := q.last // 将等待队列 q 中的最后一个元素(如果存在)赋值给变量 x。
    	if x == nil { // 如果队列中最后一个都没有,则队列无元素,即 x 为 nil,
    		sgp.prev = nil // 则将 sgp 元素的 prev 指针设为 nil,表示该元素是队列中的第一个元素,
    		q.first = sgp //  然后将队列的 first 和 last 指针都指向该元素,表示该元素是队列中唯一的元素。
    		q.last = sgp  // 然后直接返回,结束入队操作。
    		return
    	}
    	sgp.prev = x // sgp 是新加的最后元素,需要关联前一个元素(x为原队列中最后一个元素)
    	x.next = sgp // 设置x的的下一个元素为新加的元素
    	q.last = sgp // 设置q队列的最后一个元素
    }

(四)接收

i <- ch i, ok <- ch

执行 runtime.chanrecv1(SB) 都是调用的chanrecv()

  • 异常检查

  • 从 nil 通道中阻塞挂起接收

  • 从 closed 通道输入(即读),返回零值

  • 当前通道是否能够接收数据

  • 同步接收 :当 sendq 中存在发送者时,则立即唤醒并捕获数据。

  • 异步接收 :若 c.qcount 队列中有元素,则从而从 buf 中获取数据。

  • 阻塞接收 :当前面的条件都不满足且 block = true 时:

    • 执行 receive 操作会导致线程阻塞并挂起,
    • 然后被加入 recvq 待处理队列,
    • 直至有一个发送者到达。
复制代码
    chanrecv 函数的逻辑和 chansend 的逻辑基本一致

(五)关闭

close(ch)

closechan(c *hchan)

主要逻辑:

  • 异常检查: * 关闭 nil chan ,panic

    • 关闭 closed chan,panic
  • 请将通道chan标记为禁用状态

  • 请唤醒并调度等待队列recvq及sendq中的sudog,并确保所有响应者返回零值

复制代码
    func closechan(c *hchan) {
      // ------------------------------------ 1. 异常检查 ------------------------------------
    	if c == nil {
    		panic(plainError("close of nil channel")) 				// 关闭 nil chan ,panic
    	}
    
    	lock(&c.lock)																			// 上锁
    	if c.closed != 0 {
    		unlock(&c.lock)
    		panic(plainError("close of closed channel"))		// 关闭 closed chan,panic
    	}
    
    	c.closed = 1																			// 标识chan已经关闭
    
       // ------------------------------------ 2. 释放等待的 sudog ------------------------------------
    	var glist gList 																	// 存储 recvq、sendq 等待队列中的 sg(sudog)
    	for {
    		sg := c.recvq.dequeue()													// 将 recvq 等待队列中的  sg(sudog) 添加到 glist
    		......
    		glist.push(gp)
    	}
    	for {
    		sg := c.sendq.dequeue()													// 将 sendq 等待队列中的  sg(sudog) 添加到 glist
    		......
    		glist.push(gp)
    	}
    	unlock(&c.lock)																		// 解锁
    
      for !glist.empty() {															//依次从 glist 中弹出 sg(sudog)并唤醒执行,所有接收者收到零值
    		gp := glist.pop()
    		gp.schedlink = 0
    		goready(gp, 3)
    	}
    }

三、常见问题

1.为什么要使用环形队列

chan的内部采用了环形队列来获取数据,在发送或接收操作过程中,系统会依据sendx和recvx记录的位置信息从循环队列buf中获取数据。

所以环形队列:buf+sendx+recvx实现的,

使用环形数组实现的好处:

避免对数组进行复制或者移动操作

1,2,3

1,2,3

2,3,4

例如数组【1, 2, 3

避免内存分配和拷贝的开销,从而提高程序的性能

重复利用,避免重新分配内存

2. 关于chan的操作

image-20230328143228214.png

总结

chan 提供了一种在 goroutine 之间实现数据传输与协调的机制。该机制通过管理并发访问并实现对共享资源的访问控制,在降低可能出现的竞态条件以及潜在的死锁风险的同时,能够自然地处理异步事件触发与信号响应。如果您的应用程序需要在 goroutine 之间传递信息或数据包,则使用通道是一种高效的方式

  • 内部采用 hchan 架构(具体细节可见源码),其核心组件包括环形缓冲区、发送端双向链表、接收端双向链表以及互斥机制。
  • 当 channel 与 select 语句结合使用时,默认会调用 chansend 和 chanrecv 函数。
  • 具体而言:
    • 结构组成:包括环形缓存区域、send queue 和 recv queue;
    • 功能流程:涵盖锁定与解锁操作、阻塞与非阻塞模式、缓冲与非缓冲状态等关键环节。

参考资料

go源码之chan

有劳各位看官点赞、关注➕收藏 ,你们的支持是我最大的动力!!!
同时也欢迎大家在评论区提问、分享您的经验和见解!!!

全部评论 (0)

还没有任何评论哟~