Go 语言有强大的并发能力,能够简单的通过 go 关键字创建大量的轻量级协程 Goroutine,帮助程序快速执行各种任务,比Java等其他支持多线程的语言在并发方面更为强大,除了会用它,我们还需要掌握其底层原理,自己花时间把 GMP 调度器的底层源码学习一遍,才能对它有较为深刻的理解和掌握,本文是自己个人对于 Go语言 GMP 调度器(Go Scheduler)底层原理的学习笔记。
在学习 Go 语言的 GMP 调度器之前,原以为 GMP 底层原理较为复杂,要花很多时间和精力才能掌握,亲自下场学习之后,才发现其实并不复杂,只需三个多小时就足够:先花半个多小时,学习下刘丹冰Aceld 的 B 站讲解视频《Golang深入理解GPM模型》,然后花两个小时,结合《Go语言设计和实现》6.5节调度器的内容阅读下相关源码,最后,可以快速看看 GoLang-Scheduling In Go 前两篇文章的中译版,这样可以较快掌握 GMP 调度器的设计思想。
当然,如果希望理解的更加透彻,还需要仔细钻研几遍源码,并学习其他各种资料,尤其是 Go 开发者的文章,最好能够输出一篇文章,以加深头脑中神经元的连接和对事情本质的理解,本文就是这一学习思路的结果,希望能帮助到感兴趣的同学。
本文的代码基于 Go1.18.1 版本,跟 Go1.14 版本的调度器的主要逻辑相比,依然没有大的变化,目前看到的改动是调度循环的 runtime.findrunnable() 函数,抽取了多个逻辑封装成了新的方法,比如 M 从 其他 P 上偷取 G 的 runtime.stealWork()。
先给出整篇文章的结论和大纲,便于大家获取重点:
在现代的操作系统中,为了提高并发处理任务的能力,一个 CPU 核上通常会运行多个线程,多个线程的创建、切换使用、销毁开销通常较大:
1)一个内核线程的大小通常达到1M,因为需要分配内存来存放用户栈和内核栈的数据;
2)在一个线程执行系统调用(发生 IO 事件如网络请求或读写文件)不占用 CPU 时,需要及时让出 CPU,交给其他线程执行,这时会发生线程之间的切换;
3)线程在 CPU 上进行切换时,需要保持当前线程的上下文,将待执行的线程的上下文恢复到寄存器中,还需要向操作系统内核申请资源;
在高并发的情况下,大量线程的创建、使用、切换、销毁会占用大量的内存,并浪费较多的 CPU 时间在非工作任务的执行上,导致程序并发处理事务的能力降低。
为了解决传统内核级的线程的创建、切换、销毁开销较大的问题,Go 语言将线程分为了两种类型:内核级线程 M (Machine),轻量级的用户态的协程 Goroutine,至此,Go 语言调度器的三个核心概念出现了两个:
M: Machine的缩写,代表了内核线程 OS Thread,CPU调度的基本单元;
G: Goroutine的缩写,用户态、轻量级的协程,一个 G 代表了对一段需要被执行的 Go 语言程序的封装;每个 Goroutine 都有自己独立的栈存放自己程序的运行状态;分配的栈大小 2KB,可以按需扩缩容;
在早期,Go 将传统线程拆分为了 M 和 G 之后,为了充分利用轻量级的 G 的低内存占用、低切换开销的优点,会在当前一个M上绑定多个 G,某个正在运行中的 G 执行完成后,Go 调度器会将该 G 切换走,将其他可以运行的 G 放入 M 上执行,这时一个 Go 程序中只有一个 M 线程:
这个方案的优点是用户态的 G 可以快速切换,不会陷入内核态,缺点是每个 Go 程序都用不了硬件的多核加速能力,并且 G 阻塞会导致跟 G 绑定的 M 阻塞,其他 G 也用不了 M 去执行自己的程序了。
为了解决这些不足,Go 后来快速上线了多线程调度器:
每个Go程序,都有多个 M 线程对应多个 G 协程,该方案有以下缺点:
1)全局锁、中心化状态带来的锁竞争导致的性能下降; 2)M 会频繁交接 G,导致额外开销、性能下降;每个 M 都得能执行任意的 runnable 状态的 G; 3)每个 M 都需要处理内存缓存,导致大量的内存占用并影响数据局部性; 4)系统调用频繁阻塞和解除阻塞正在运行的线程,增加了额外开销;
为了解决多线程调度器的问题,Go 开发者 Dmitry Vyokov 在已有 G、M 的基础上,引入了 P 处理器,由此产生了当前 Go 中经典的 GMP 调度模型。
P:Processor的缩写,代表一个虚拟的处理器,它维护一个局部的可运行的 G 队列,可以通过 CAS 的方式无锁访问,工作线程 M 优先使用自己的局部运行队列中的 G,只有必要时才会去访问全局运行队列,这大大减少了锁冲突,提高了大量 G 的并发性。每个 G 要想真正运行起来,首先需要被分配一个 P。
如图 1.5 所示,是当前 Go 采用的 GMP 调度模型。可运行的 G 是通过处理器 P 和线程 M 绑定起来的,M 的执行是由操作系统调度器将 M 分配到 CPU 上实现的,Go 运行时调度器负责调度 G 到 M 上执行,主要在用户态运行,跟操作系统调度器在内核态运行相对应。
需要说明的是,Go 调度器也叫 Go 运行时调度器,或 Goroutine 调度器,指的是由运行时在用户态提供的多个函数组成的一种机制,目的是为了高效地调度 G 到 M上去执行。可以跟操作系统的调度器 OS Scheduler 对比来看,后者负责将 M 调度到 CPU 上运行。从操作系统层面来看,运行在用户态的 Go 程序只是一个请求和运行多个线程 M 的普通进程,操作系统不会直接跟上层的 G 打交道。
至于为什么不直接将本地队列放在 M 上、而是要放在 P 上呢? 这是因为当一个线程 M 阻塞(可能执行系统调用或 IO请求)的时候,可以将和它绑定的 P 上的 G 转移到其他线程 M 去执行,如果直接把可运行 G 组成的本地队列绑定到 M,则万一当前 M 阻塞,它拥有的 G 就不能给到其他 M 去执行了。
基于 GMP 模型的 Go 调度器的核心思想是:
在进入 GMP 调度模型的数据结构和源码之前,可以先用几张图形象的描述下 GMP 调度机制的一些场景,帮助理解 GMP 调度器为了保证公平性、可扩展性、及提高并发效率,所设计的一些机制和策略。
1)创建 G: 正在 M1 上运行的P,有一个G1,通过go func() 创建 G2 后,由于局部性,G2优先放入P的本地队列;
2)G 运行完成后:M1 上的 G1 运行完成后(调用goexit()函数),M1 上运行的 Goroutine 会切换为 G0,G0 负责调度协程的切换(运行schedule() 函数),从 M1 上 P 的本地运行队列获取 G2 去执行(函数execute());注意:这里 G0 是程序启动时的线程 M(也叫M0)的系统栈表示的 G 结构体,负责 M 上 G 的调度;
3)M 上创建的 G 个数大于本地队列长度时:如果 P 本地队列最多能存 4 个G(实际上是256个),正在 M1 上运行的 G2 要通过go func()创建 6 个G,那么,前 4 个G 放在 P 本地队列中,G2 创建了第 5 个 G(G7)时,P 本地队列中前一半和 G7 一起打乱顺序放入全局队列,P 本地队列剩下的 G 往前移动,G2 创建的第 6 个 G(G8)时,放入 P 本地队列中,因为还有空间;
4)M 的自旋状态:创建新的 G 时,运行的 G 会尝试唤醒其他空闲的 M 绑定 P 去执行,如果 G2 唤醒了M2,M2 绑定了一个 P2,会先运行 M2 的 G0,这时 M2 没有从 P2 的本地队列中找到 G,会进入自旋状态(spinning),自旋状态的 M2 会尝试从全局空闲线程队列里面获取 G,放到 P2 本地队列去执行,获取的数量满足公式:n = min(len(globrunqsize)/GOMAXPROCS + 1, len(localrunsize/2)),含义是每个P应该从全局队列承担的 G 数量,为了提高效率,不能太多,要给其他 P 留点;
5)任务窃取机制:自旋状态的 M 会寻找可运行的 G,如果全局队列为空,则会从其他 P 偷取 G 来执行,个数是其他 P 运行队列的一半;
6)G 发生系统调用时:如果 G 发生系统调度进入阻塞,其所在的 M 也会阻塞,因为会进入内核状态等待系统资源,和 M 绑定的 P 会寻找空闲的 M 执行,这是为了提高效率,不能让 P 本地队列的 G 因所在 M 进入阻塞状态而无法执行;需要说明的是,如果是 M 上的 G 进入 Channel 阻塞,则该 M 不会一起进入阻塞,因为 Channel 数据传输涉及内存拷贝,不涉及系统资源等待;
7)G 退出系统调用时:如果刚才进入系统调用的 G2 解除了阻塞,其所在的 M1 会寻找 P 去执行,优先找原来的 P,发现没有找到,则其上的 G2 会进入全局队列,等其他 M 获取执行,M1 进入空闲队列;
G 的数据结构是:
// src/runtime/runtime2.go
type g struct {
stack stack // 描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi)
stackguard0 uintptr // 用于调度器抢占式调度
_panic *_panic // 最内侧的 panic 结构体
_defer *_defer // 最内侧的 defer 延迟函数结构体
m *m // 当前 G 占用的线程,可能为空
sched gobuf // 存储 G 的调度相关的数据
atomicstatus uint32 // G 的状态
goid int64 // G 的 ID
waitreason waitReason //当状态status==Gwaiting时等待的原因
preempt bool // 抢占信号
preemptStop bool // 抢占时将状态修改成 `_Gpreempted`
preemptShrink bool // 在同步安全点收缩栈
lockedm muintptr //G 被锁定只能在这个 m 上运行
waiting *sudog // 这个 g 当前正在阻塞的 sudog 结构体
......
}
G 的主要字段有:
stack:描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi);
stackguard0: 可以用于调度器抢占式调度;preempt,preemptStop,preemptShrink跟抢占相关;
defer 和 panic:分别记录这个 G 最内侧的panic和 _defer结构体;
m:记录当前 G 占用的线程 M,可能为空;
atomicstatus:表示G 的状态;
sched:存储 G 的调度相关的数据;
goid:表示 G 的 ID,对开发者不可见;
需要展开描述的是sched 字段的 runtime.gobuf 结构体:
type gobuf struct {
sp uintptr // 栈指针
pc uintptr // 程序计数器,记录G要执行的下一条指令位置
g guintptr // 持有 runtime.gobuf 的 G
ret uintptr // 系统调用的返回值
......
}
这些字段会在调度器将当前 G 切换离开 M 和调度进入 M 执行程序时用到,栈指针 sp 和程序计数器 pc 用来存放或恢复寄存器中的值,改变程序执行的指令。
结构体 runtime.g 的 atomicstatus 字段存储了当前 G 的状态,G 可能处于以下状态:
const (
// _Gidle 表示 G 刚刚被分配并且还没有被初始化
_Gidle = iota // 0
// _Grunnable 表示 G 没有执行代码,没有栈的所有权,存储在运行队列中
_Grunnable // 1
// _Grunning 可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P
_Grunning // 2
// _Gsyscall 正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上
_Gsyscall // 3
// _Gwaiting 由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上
_Gwaiting // 4
// _Gdead 没有被使用,没有执行代码,可能有分配的栈
_Gdead // 6
// _Gcopystack 栈正在被拷贝,没有执行代码,不在运行队列上
_Gcopystack // 8
// _Gpreempted 由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒
_Gpreempted // 9
// _Gscan GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在
_Gscan = 0x1000
......
)
其中主要的六种状态是:
Gidle:G 被创建但还未完全被初始化;
Grunnable:当前 G 为可运行的,正在等待被运行;
Grunning:当前 G 正在被运行;
Gsyscall:当前 G 正在被系统调用;
Gwaiting:当前 G 正在因某个原因而等待;
Gdead:当前 G 完成了运行;
图3.1描述了G从创建到结束的生命周期中经历的各种状态变化过程:
虽然 G 在运行时中定义的状态较多且复杂,但是我们可以将这些不同的状态聚合成三种:等待中、可运行、运行中,分别由Gwaiting、Grunnable、_Grunning 三种状态表示,运行期间大部分情况是在这三种状态来回切换:
等待中:G 正在等待某些条件满足,例如:系统调用结束等,包括 Gwaiting、Gsyscall 几个状态; 可运行:G 已经准备就绪,可以在线程 M 上运行,如果当前程序中有非常多的 G,每个 G 就可能会等待更多的时间,即 _Grunnable; 运行中:G 正在某个线程 M 上运行,即 _Grunning。
M 的数据结构是:
// src/runtime/runtime2.gotype m struct {
g0 *g // 持有调度栈的 G
gsignal *g // 处理 signal 的 g
tls [tlsSlots]uintptr // 线程本地存储 mstartfn func() // M的起始函数,go语句携带的那个函数
curg *g // 在当前线程上运行的 G
p puintptr // 执行 go 代码时持有的 p (如果没有执行则为 nil)
nextp puintptr // 用于暂存与当前 M 有潜在关联的 P
oldp puintptr // 执行系统调用前绑定的 P
spinning bool // 表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态
lockedg guintptr // 表示与当前 M 锁定的那个 G
.....
}
M 的字段众多,其中最重要的为下面几个:
g0: Go 运行时系统在启动之初创建的,用来调度其他 G 到 M 上;
mstartfn:表示M的起始函数,其实就是go 语句携带的那个函数;
curg:存放当前正在运行的 G 的指针;
p:指向当前与 M 关联的那个 P;
nextp:用于暂存于当前 M 有潜在关联的 P;
spinning:表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态;
lockedg:表示与当前M锁定的那个 G,运行时系统会把 一个 M 和一个 G 锁定,一旦锁定就只能双方相互作用,不接受第三者;
M 并没有像 G 和 P 一样的状态标记, 但可以认为一个 M 有以下的状态:
自旋中(spinning): M 正在从运行队列获取 G, 这时候 M 会拥有一个 P;
执行go代码中: M 正在执行go代码, 这时候 M 会拥有一个 P;
执行原生代码中: M 正在执行原生代码或者阻塞的syscall, 这时M并不拥有P;
休眠中: M 发现无待运行的 G 时会进入休眠, 并添加到空闲 M 链表中, 这时 M 并不拥有 P。
P 的数据结构是:
// src/runtime/runtime2.gotype p struct {
status uint32 // p 的状态 pidle/prunning/...
schedtick uint32 // 每次执行调度器调度 +1
syscalltick uint32 // 每次执行系统调用 +1
m muintptr // 关联的 m
mcache *mcache // 用于 P 所在的线程 M 的内存分配的 mcache
deferpool []*_defer // 本地 P 队列的 defer 结构体池
// 可运行的 Goroutine 队列,可无锁访问
runqhead uint32
runqtail uint32
runq [256]guintptr
// 线程下一个需要执行的 G
runnext guintptr
// 空闲的 G 队列,G 状态 status 为 _Gdead,可重新初始化使用
gFree struct {
gList
n int32
}
......
}
最主要的数据结构是 status 表示 P 的不同的状态,而 runqhead、runqtail 和 runq 三个字段表示处理器持有的运行队列,是一个长度为256的环形队列,其中存储着待执行的 G 列表,runnext 中是线程下一个需要执行的 G;gFree 存储 P 本地的状态为_Gdead 的空闲的 G,可重新初始化使用。
P 结构体中的状态 status 字段会是以下五种中的一种:
_Pidle:P 没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空;
_Prunning:被线程 M 持有,并且正在执行用户代码或者调度器;
_Psyscall:没有执行用户代码,当前线程陷入系统调用;
_Pgcstop:被线程 M 持有,当前处理器由于垃圾回收被停止;
_Pdead:当前 P 已经不被使用;
P 的五种状态之间的转化关系如图 3.2 所示:
调度器的schedt结构体存储了全局的 G 队列,空闲的 M 列表和 P 列表:
// src/runtime/runtime2.gotype schedt struct {
lock mutex // schedt的锁
midle muintptr // 空闲的M列表
nmidle int32 // 空闲的M列表的数量
nmidlelocked int32 // 被锁定正在工作的M数
mnext int64 // 下一个被创建的 M 的 ID
maxmcount int32 // 能拥有的最大数量的 M
pidle puintptr // 空闲的 P 链表
npidle uint32 // 空闲 P 数量
nmspinning uint32 // 处于自旋状态的 M 的数量
// 全局可执行的 G 列表
runq gQueue
runqsize int32 // 全局可执行 G 列表的大小
// 全局 _Gdead 状态的空闲 G 列表
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
// sudog结构的集中存储
sudoglock mutex
sudogcache *sudog
// 有效的 defer 结构池
deferlock mutex
deferpool *_defer
......
}
除了上面的四个结构体,还有一些全局变量:
// src/runtime/runtime2.go
var (
allm *m // 所有的 M
gomaxprocs int32 // P 的个数,默认为 ncpu 核数
ncpu int32
......
sched schedt // schedt 全局结构体
newprocs int32
allpLock mutex // 全局 P 队列的锁
allp []*p // 全局 P 队列,个数为 gomaxprocs
......
}
此外,src/runtime/proc.go 文件有两个全局变量:
// src/runtime/proc.go var (
m0 m // 进程启动后的初始线程
g0 g // 代表着初始线程的stack
......
)
到这里,G、M、P、schedt结构体和全局变量都描述完毕,GMP 的全部队列如下表3-1所示:
表3-1 GMP的队列
中文名 | 源码的名称 | 作用域 | 简要说明 |
---|---|---|---|
全局M列表 | runtime.allm | 运行时系统 | 存放所有M |
全局P列表 | runtime.allp | 运行时系统 | 存放所有P |
调度器中的空闲M列表 | runtime.schedt.midle | 调度器 | 存放空闲M |
调度器中的空闲P列表 | runtime.schedt.pidle | 调度器 | 存放空闲P |
调度器中的可运行G队列 | runtime.schedt.runq | 调度器 | 存放可运行G |
P的本地可运行G队列 | runtime.p.runq | 本地P | 存放当前P中的可运行G |
调度器中的空闲G列表 | runtime.schedt.gfree | 调度器 | 存放空闲的G |
P中的空闲G列表 | runtime.p.gfree | 本地P | 存放当前P中的空闲G |
Go 程序一启动,Go 的运行时 runtime 自带的调度器 scheduler 就开始启动了。
对于一个最简单的Go程序:
package main
import "fmt"
func main() {
fmt.Println("hello world")
}
通过 gdb或dlv的方式调试,会发现程序的真正入口不是在 runtime.main,对 AMD64 架构上的 Linux 和 macOS 服务器来说,分别在runtime包的 src/runtime/rt0_linux_amd64.s 和 src/runtime/rt0_darwin_amd64.s:
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
JMP _rt0_amd64(SB)
TEXT _rt0_amd64_darwin(SB),NOSPLIT,$-8
JMP _rt0_amd64(SB)
两者均跳转到了 src/runtime/asm_amd64.s 包的 _rt0_amd64 函数:
TEXT _rt0_amd64(SB),NOSPLIT,$-8
MOVQ 0(SP), DI // argc
LEAQ 8(SP), SI // argv
JMP runtime·rt0_go(SB)
_rt0_amd64 函数调用了 src/runtime/asm_arm64.s 包的 runtime·rt0_go 函数:
TEXT runtime·rt0_go(SB),NOSPLIT|TOPFRAME,$0
......
// 初始化g0
MOVD $runtime·g0(SB), g
......
// 初始化 m0
MOVD $runtime·m0(SB), R0// 绑定 g0 和 m0
MOVD g, m_g0(R0)
MOVD R0, g_m(g)
......
BL runtime·schedinit(SB) // 调度器初始化
// 创建一个新的 goroutine 来启动程序
MOVD $runtime·mainPC(SB), R0 // main函数入口
.......
BL runtime·newproc(SB) // 负责根据主函数即 main 的入口地址创建可被运行时调度的执行单元goroutine
.......
// 开始启动调度器的调度循环
BL runtime·mstart(SB)
......
DATA runtime·mainPC+0(SB)/8,$runtime·main<ABIInternal>(SB) // main函数入口地址
GLOBL runtime·mainPC(SB),RODATA,$8
Go程序的真正启动函数 runtime·rt0_go 主要做了几件事:
1)初始化 g0 和 m0,并将二者互相绑定, m0 是程序启动后的初始线程,g0 是 m0 的系统栈代表的 G 结构体,负责普通 G 在M 上的调度切换;
2)schedinit:进行各种运行时组件初始化工作,这包括我们的调度器与内存分配器、回收器的初始化;
3)newproc:负责根据主函数即 main 的入口地址创建可被运行时调度的执行单元;
4)mstart:开始启动调度器的调度循环;
阅读 Go 调度器的源码,需要先从整体结构上对其有个把握,Go 程序启动后的调度器主逻辑如图 4.1 所示:
下面分为两部分来分析调度器的原理:调度器的启动和调度循环。
调度器启动函数在 src/runtime/proc.go 包的 schedinit() 函数:
// src/runtime/proc.go
// 调度器初始化
func schedinit() {
......
_g_ := getg()
...... // 设置机器线程数M最大为10000
sched.maxmcount = 10000
......
// 栈、内存分配器相关初始化
stackinit() // 初始化栈
mallocinit() // 初始化内存分配器
......
// 初始化当前系统线程 M0
mcommoninit(_g_.m, -1)
......
// GC初始化
gcinit()
......
// 设置P的值为GOMAXPROCS个数
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
// 调用procresize调整 P 列表
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
......
}
schedinit() 函数会设置 M 最大数量为10000,实际中不会达到;会分别调用stackinit() 、mallocinit() 、mcommoninit() 、gcinit() 等执行 goroutine栈初始化、进行内存分配器初始化、进行系统线程M0的初始化、进行GC垃圾回收器的初始化;接着,将 P 个数设置为 GOMAXPROCS 的值,即程序能够同时运行的最大处理器数,最后会调用 runtime.procresize()函数初始化 P 列表。
schedinit() 函数负责M、P、G 的初始化过程。M/P/G 彼此的初始化顺序遵循:mcommoninit、procresize、newproc,他们分别负责初始化 M 资源池(allm)、P 资源池(allp)、G 的运行现场(g.sched)以及调度队列(p.runq)。
mcommoninit() 函数主要负责对 M0 进行一个初步的初始化,并将其添加到 schedt 全局结构体中,这里访问 schedt 会加锁:
// src/runtime/proc.gofunc mcommoninit(mp *m, id int64) {
......
lock(&sched.lock)
if id >= 0 {
mp.id = id
} else { // mReserveID() 会返回 sched.mnext 给当前 m,并对 sched.mnext++,记录新增加的这个 M 到 schedt 全局结构体
mp.id = mReserveID()
}
......
// 添加到 allm 中
mp.alllink = allm
// 等价于 allm = mp
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
......
}
runtime.procresize()函数的逻辑是:
// src/runtime/proc.go
func procresize(nprocs int32) *p {
......
// 获取先前的 P 个数
old := gomaxprocs
......
// 如果全局变量 allp 切片中的处理器数量少于期望数量,对 allp 扩容
if nprocs > int32(len(allp)) {
// 加锁
lock(&allpLock)
if nprocs <= int32(cap(allp)) { // 如果要达到的 P 个数 nprocs 小于当前全局 P 切片到容量
allp = allp[:nprocs] // 在当前全局 P 切片上截取前 nprocs 个 P
} else { // 否则,调大了,超出全局 P 切片的容量,创建容量为 nprocs 的新的 P 切片
nallp := make([]*p, nprocs)
// 将原有的 p 复制到新创建的 nallp 中
copy(nallp, allp[:cap(allp)])
allp = nallp // 新的 nallp 切片赋值给旧的 allp
}
......
unlock(&allpLock)
}
// 使用 new 创建新的 P 结构体并调用 runtime.p.init 初始化刚刚扩容的allp列表里的 P
for i := old; i < nprocs; i++ {
pp := allp[i] // 如果 p 是新创建的(新创建的 p 在数组中为 nil),则申请新的 P 对象
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
_g_ := getg()
// 当前 G 的 M 上的 P 不为空,并且其 id 小于 nprocs,说明 ID 有效,则可以继续使用当前 G 的 P
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
// 继续使用当前 P, 其状态设置为 _Prunning
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else {
// 否则,释放当前 P 并获取 allp[0]
if _g_.m.p != 0 {
......
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
// 将处理器 allp[0] 绑定到当前 M
p := allp[0]
p.m = 0
p.status = _Pidle // P 状态设置为 _Pidle
acquirep(p) // 将allp[0]绑定到当前的 M
if trace.enabled {
traceGoStart()
}
}
mcache0 = nil
// 调用 runtime.p.destroy 释放从未使用的 P
for i := nprocs; i < old; i++ {
p := allp[i]
p.destroy()
// 不能释放 p 本身,因为他可能在 m 进入系统调用时被引用
}
// 裁剪 allp,保证allp长度与期望处理器数量相等
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
idlepMask = idlepMask[:maskWords]
timerpMask = timerpMask[:maskWords]
unlock(&allpLock)
}
var runnablePs *p
// 将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局的空闲队列中
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p { // 跳过当前 P
continue
}
p.status = _Pidle // 设置 P 的状态为空闲状态
if runqempty(p) {
pidleput(p) // 放入到全局结构体 schedt 的空闲 P 列表中
} else {
p.m.set(mget()) // 如果有本地任务,则为其绑定一个 M
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
return runnablePs // 返回所有包含本地任务的 P 链表
}
runtime.procresize() 函数 的执行过程如下:
1)如果全局变量 allp 切片中的 P 数量少于期望数量,会对切片进行扩容;
2)使用 new 创建新的 P 结构体并调用 runtime.p.init 初始化刚刚扩容的 P;
3)通过指针将线程 m0 和处理器 allp[0] 绑定到一起;
4)调用 runtime.p.destroy 释放不再使用的 P 结构;
5)通过切片截断改变全局变量 allp 的长度,保证它与期望 P 数量相等;
6)将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局 schedt 的空闲 P 队列中;
runtime.procresize() 函数的逻辑如图 4.3 所示:
runtime.procresize() 函数调用 runtime.p.init 初始化新创建的 P:
// src/runtime/proc.go
// 初始化 P
func (pp *p) init(id int32) {
pp.id = id // p 的 id 就是它在 allp 中的索引
pp.status = _Pgcstop // 新创建的 p 处于 _Pgcstop 状态
......
// 为 P 分配 cache 对象,涉及对象分配
if pp.mcache == nil {
if id == 0 {
if mcache0 == nil {
throw("missing mcache?")
}
pp.mcache = mcache0
} else {
pp.mcache = allocmcache()
}
}
......
}
需要说明的是,mcache内存结构原来是在 M 上的,自从引入了 P 之后,就将该结构体移到了P上,这样,就不用每个 M 维护自己的内存分配 mcache,由于 P 在有 M 可以执行时才会移动到其他 M 上去,空闲的 M 无须分配内存,这种灵活性使整体线程的内存分配大大减少。
我们再回到 4.1 节对程序启动函数 runtime·rt0_go,有个动作是通过 runtime.newproc 函数创建 G,runtime.newproc 入参是 funcval 结构体函数,代表 go 关键字后面调用的函数:
// src/runtime/proc.go
// 创建G,并放入 P 的运行队列
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc() // 获取调用方 PC 寄存器值,即调用方程序要执行的下一个指令地址 // 用 g0 系统栈创建 Goroutine 对象 // 传递的参数包括 fn 函数入口地址, gp(g0),调用方 pc
systemstack(func() {
newg := newproc1(fn, gp, pc) // 调用 newproc1 获取 Goroutine 结构
_p_ := getg().m.p.ptr() // 获取当前 G 的 P
runqput(_p_, newg, true) // 将新的 G 放入 P 的本地运行队列
if mainStarted { // M 启动时唤醒新的 P 执行 G
wakep()
}
})
}
runtime.newproce函数主要是调用 runtime.newproc1 获取新的 Goroutine 结构,将新的 G 放入P的运行队列,M 启动时唤醒新的 P 执行 G。
runtime.newproce函数的逻辑如图4.4所示:
runtime.newproce1() 函数的逻辑是:
// src/runtime/proc.go
// 创建一个运行fn函数的goroutine
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
_g_ := getg() // 因为是在系统栈运行所以此时的 g 为 g0
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
acquirem() // 加锁,禁止这时 G 的 M 被抢占,因为它可以在一个局部变量中保存 P
_p_ := _g_.m.p.ptr() // 获取 P
newg := gfget(_p_) // 从 P 的空闲列表获取一个空闲的 G
if newg == nil { // 找不到则创建
newg = malg(_StackMin) // 创建一个栈大小为 2K 的 G
casgstatus(newg, _Gidle, _Gdead) // CAS 改变 G 的状态为_Gdead
allgadd(newg) // 将 _Gdead 状态的 g 添加到 allg,这样 GC 不会扫描未初始化的栈
}
......
// 计算运行空间大小,对齐
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize)
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize // 确定 SP 和参数入栈位置
spArg := sp
......
// 清理、创建并初始化 G 的运行现场
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp// 保存goexit的地址到sched.pc
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
// 初始化 G 的基本状态
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
......
// 将 G 的状态设置为_Grunnable
casgstatus(newg, _Gdead, _Grunnable)
......
// 生成唯一的goid
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
......
// 释放对 M 加的锁
releasem(_g_.m)
return newg
}
runtime.newproc1() 函数主要执行三个动作: 1)获取或者创建新的 Goroutine 结构体,会先从处理器的 gFree 列表中查找空闲的 Goroutine,如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体,新创建的 G 的状态为_Gdead; 2)将传入的参数 callergp,callerpc,fn更新到 G 的栈上,初始化 G 的相关参数; 3)将 G 状态设置为 _Grunnable 状态,返回;
runtime.newproc1() 函数的逻辑如图 4.5 所示:
runtime.newproc1() 函数主要调用 runtime.gfget() 函数 获取 G:
// src/runtime/proc.go
func gfget(_p_ *p) *g {
retry:
// 如果 P 的空闲列表 gFree 为空,sched 的的空闲列表 gFree 不为空
if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
lock(&sched.gFree.lock)
// 从 sched 的 gFree 列表中移动 32 个 G 到 P 的 gFree 中
for _p_.gFree.n < 32 {
gp := sched.gFree.stack.pop()
if gp == nil {
gp = sched.gFree.noStack.pop()
if gp == nil {
break
}
}
sched.gFree.n--
_p_.gFree.push(gp)
_p_.gFree.n++
}
unlock(&sched.gFree.lock)
goto retry
}
// 如果此时 P 的空闲列表还是为空,返回nil,说明无空闲的G
gp := _p_.gFree.pop()
if gp == nil {
return nil
}
_p_.gFree.n--
// 设置 G 的栈空间
if gp.stack.lo == 0 {
systemstack(func() {
gp.stack = stackalloc(_FixedStack)
})
gp.stackguard0 = gp.stack.lo + _StackGuard
} else {
.....
}
return gp // 从 P 的空闲列表获取 G 返回
}
runtime.gfget() 函数的主要逻辑是:当 P 的空闲列表 gFree 为空时,从 sched 持有的全局空闲列表 gFree 中移动最多 32个 G 到当前的 P 的空闲列表上;然后从 P 的 gFree 列表头返回一个 G;如果还是没有,则返回空,说明获取不到空闲的 G。
在 runtime.newproc1() 函数中,如果不存在空闲的 G,会通过 runtime.malg() 创建一个栈大小足够的新结构体:
// src/runtime/proc.go
// 创建一个新的 g 结构体
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 { // 如果申请的堆栈大小大于 0,会通过 runtime.stackalloc 分配 2KB 的栈空间
stacksize = round2(_StackSystem + stacksize)
systemstack(func() {
newg.stack = stackalloc(uint32(stacksize))
})
newg.stackguard0 = newg.stack.lo + _StackGuard
newg.stackguard1 = ^uintptr(0)
*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
}
return newg
}
回到 runtime.newproce函数,在获取到 G 后,会调用 runtime.runqput() 函数将 G 放入 P 本地队列,或全局队列:
// src/runtime/proc.go
// 将 G 放入 P 的运行队列中
func runqput(_p_ *p, gp *g, next bool) { // 保持一定的随机性,不将当前 G 设置为 P 的下一个执行的任务
if randomizeScheduler && next && fastrandn(2) == 0 {
next = false
}
if next {
retryNext:
// 将 G 放入到 P 的 runnext 变量中,作为下一个 P 执行的任务
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// 获取原来的 runnext 存储的 G,放入 P 本地运行队列,或全局队列
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead) // 获取 P 环形队列的头和尾部指针
t := _p_.runqtail
// P 本地环形队列没有满,将 G 放入本地环形队列
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1)
return
}
// P 本地环形队列已满,将 G 放入全局队列
if runqputslow(_p_, gp, h, t) {
return
}
// 本地队列和全局队列没有满,则不会走到这里,否则循环尝试放入
goto retry
}
runtime.runqput() 函数的主要处理逻辑是:
1)保留一定的随机性,设置 next 为 false,即不将当前 G 设置为 P 的下一个执行的 G;
2)当 next 为 true 时,将 G 设置到 P 的 runnext 作为 P 下一个执行的任务;
3)当 next 为 false 并且本地运行队列还有剩余空间时,将 Goroutine 加入处理器持有的本地运行队列;
4)当处理器的本地运行队列已经没有剩余空间时,就会把本地队列中的一部分 G 和待加入的 G 通过 runtime.runqputslow 添加到调度器持有的全局运行队列上;
runtime.runqput() 函数的逻辑如图 4.6 所示:
runtime.runqputslow() 函数的逻辑如下:
// 将 G 和 P 本地队列的一部分放入全局队列
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
var batch [len(_p_.runq)/2 + 1]*g // 初始化一个本地队列长度一半 + 1 的 G 列表 batch
// 首先,从 P 本地队列中获取一部分 G 放入初始化的列表 batch
n := t - h
n = n / 2
if n != uint32(len(_p_.runq)/2) {
throw("runqputslow: queue is not full")
}
for i := uint32(0); i < n; i++ { // 将 P 本地环形队列的前一半 G 放入batch
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
}
if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
batch[n] = gp // 将传入的 G 放入列表 batch 的尾部
if randomizeScheduler { // 打乱 batch 列表中G的顺序
for i := uint32(1); i <= n; i++ {
j := fastrandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// 将 batch列表的 G 串成一个链表.
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}
var q gQueue // 将 batch 列表设置成 gQueue 队列
q.head.set(batch[0])
q.tail.set(batch[n])
// 现在把 gQueue 队列放入全局队列
lock(&sched.lock)
globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock)
return true
}
runtime.runqputslow() 函数会把 P 本地环形队列的前一半 G 获取出来,跟传入的 G 组成一个列表,打乱顺序,再放入全局队列。
综上所属,用下图表示调度器启动流程:
我们再回到5.1节的程序启动流程,runtime·rt0_go 函数在调用 runtime.schedinit() 初始化好了调度器、调用 runtime.newproc()创建了main函数的 G 后,会调用runtime.mstart() 函数启动 M 去执行G。
TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0
CALL runtime·mstart0(SB)
RET // not reached
runtime.mstart() 是用汇编写的,会直接调用 runtime.mstart0() 函数:
// src/runtime/proc.go
func mstart0() {
_g_ := getg()
......
// 初始化 g0 的参数
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0
mstart1()
......
mexit(osStack)
}
runtime.mstart0() 函数主要调用 runtime.mstart1():
// src/runtime/proc.go
func mstart1() {
_g_ := getg()
if _g_ != _g_.m.g0 {
throw("bad runtime·mstart")
}
// 记录当前栈帧,便于其他调用复用,当进入 schedule 之后,再也不会回到 mstart1
_g_.sched.g = guintptr(unsafe.Pointer(_g_))
_g_.sched.pc = getcallerpc()
_g_.sched.sp = getcallersp()
asminit()
minit()
// 设置信号 handler;在 minit 之后,因为 minit 可以准备处理信号的的线程
if _g_.m == &m0 {
mstartm0()
}
// 执行启动函数
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
// 如果当前 m 并非 m0,则要求绑定 p
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// 准备好后,开始调度循环,永不返回
schedule()
}
runtime.mstart1() 保存调度信息后,会调用 runtime.schedule() 进入调度循环,寻找一个可执行的 G 并执行。
循环调度主逻辑如图5.1所示:
runtime.schedule() 函数的逻辑是:
// src/runtime/proc.go
func schedule() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
......
top:
pp := _g_.m.p.ptr()
pp.preempt = false
if sched.gcwaiting != 0 { // 如果需要 GC,不再进行调度
gcstopm()
goto top
}
if pp.runSafePointFn != 0 { // 不等于0,说明在安全点
runSafePointFn()
}
// 如果 G 所在的 M 在自旋,说明其P运行队列为空,如果不为空,则应该甩出错误
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
// 运行 P 上准备就绪的 Timer
checkTimers(pp, 0)
var gp *g
var inheritTime bool
......
if gp == nil { // 说明不在 GC
// 每调度 61 次,就检查一次全局队列,保证公平性;否则两个 Goroutine 可以通过互换,一直占领本地的 runqueue
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1) // 从全局队列中偷 g
unlock(&sched.lock)
}
}
if gp == nil {
// 从 P 的本地队列获取 G
gp, inheritTime = runqget(_g_.m.p.ptr())
}
if gp == nil {
gp, inheritTime = findrunnable() // 阻塞式查找可用 G
}
// M 这时候一定是获取到了G
// 如果 M 是自旋状态,重置其状态到非自旋
if _g_.m.spinning {
resetspinning()
}
.......
// 执行 G
execute(gp, inheritTime)
}
runtime.schedule() 函数会从下面几个地方查找待执行的 Goroutine:
1)为了保证公平,当全局运行队列中有待执行的 G 时,通过 schedtick 对 61 取模,表示每 61 次会有一次从全局的运行队列中查找对应的 G,这样可以避免两个 G 在 P 本地队列互换一直占有本地队列; 2)调用 runtime.runqget() 函数从 P 本地的运行队列中获取待执行的 G; 3)如果前两种方法都没有找到 G,会通过 runtime.findrunnable() 进行阻塞地查找 G;
runtime.schedule 函数从全局队列获取 G 的函数是 runtime.globrunqget() 函数:
// 从全局队列获取 G
func globrunqget(_p_ *p, max int32) *g {
assertLockHeld(&sched.lock)
// 如果全局队列没有 G,则直接返回
if sched.runqsize == 0 {
return nil
}
// 计算n,表示从全局队列放入本地队列的 G 的个数
n := sched.runqsize/gomaxprocs + 1
if n > sched.runqsize {
n = sched.runqsize
}
// n 不能超过取的要获取的max个数
if max > 0 && n > max {
n = max
}
// 计算能不能用本地队列的一般放下 n 个 G,如果放不下,则 n 设为本地队列的一半
if n > int32(len(_p_.runq))/2 {
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
// 拿到全局队列的队头作为返回的 G
gp := sched.runq.pop()
n-- // n计数减 1
// 继续取剩下的 n-1个全局队列 G 放入本地队列
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
}
return gp
}
runtime.globrunqget() 函数会从全局队列获取 n 个 G,第一个 G 返回给调度器去执行,剩下的 n-1 个 G 放入本地队列,其中,n一般为全局队列长度 / P处理器个数 + 1,含义是平均每个 P 应该从全局队列中承担的 G 数量,且不能超过 P 本地长度的一半。
runtime.schedule() 函数调用 runtime.runqget() 函数从 P 本地的运行队列中获取待执行的 G:
// 从 P 本地队列中获取 G
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// 如果 P 有一个 runnext,则它就是下一个要执行的 G.
next := _p_.runnext
// 如果 runnext 不为空,而 CAS 失败, 则它又可能被其他 P 偷取了,
// 因为其他 P 可以竞争机会到设置 runnext 为 0, 当前 P 只能设置该字段为非0
if next != 0 && _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
for {
h := atomic.LoadAcq(&_p_.runqhead) //从本地环形队列头遍历
t := _p_.runqtail
if t == h { // 头尾指针相等,表示本地队列为空
return nil, false
}
// 获取头部指针指向的 G
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) {
return gp, false
}
}
}
本地队列的获取会先从 P 的 runnext 字段中获取,如果不为空则直接返回。如果 runnext 为空,那么从本地环形队列头指针遍历本地队列,取到了则返回。
阻塞式获取 G 的 runtime.findrunnable() 函数的整个逻辑看起来比较繁琐,其实无非是按这个顺序获取 G: local -> global -> netpoll -> steal -> local -> global -> netpoll:
// 找到一个可运行的 G 去执行
// 会从其他 P 的运行队列偷取,从本地会全局队列获取,或从网络轮询器获取
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
if sched.gcwaiting != 0 { // 如果在 gc,则休眠当前 m,直到复始后回到 top
gcstopm()
goto top
}
if _p_.runSafePointFn != 0 { // 不等于0,说明在安全点
runSafePointFn()
}
now, pollUntil, _ := checkTimers(_p_, 0)
// 取本地队列 local runq,如果已经拿到,立刻返回
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 全局队列 global runq,如果已经拿到,立刻返回
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从 netpoll 网络轮询器中尝试获取 G,优先级比从其他 P 偷取 G 要高
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(