前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入分析Go1.18 GMP调度器底层原理

深入分析Go1.18 GMP调度器底层原理

原创
作者头像
涂明光
修改2022-11-27 13:43:04
2.3K0
修改2022-11-27 13:43:04
举报
文章被收录于专栏:后台技术学习

Go 语言有强大的并发能力,能够简单的通过 go 关键字创建大量的轻量级协程 Goroutine,帮助程序快速执行各种任务,比Java等其他支持多线程的语言在并发方面更为强大,除了会用它,我们还需要掌握其底层原理,自己花时间把 GMP 调度器的底层源码学习一遍,才能对它有较为深刻的理解和掌握,本文是自己个人对于 Go语言 GMP 调度器(Go Scheduler)底层原理的学习笔记。

在学习 Go 语言的 GMP 调度器之前,原以为 GMP 底层原理较为复杂,要花很多时间和精力才能掌握,亲自下场学习之后,才发现其实并不复杂,只需三个多小时就足够:先花半个多小时,学习下刘丹冰Aceld 的 B 站讲解视频《Golang深入理解GPM模型》,然后花两个小时,结合《Go语言设计和实现》6.5节调度器的内容阅读下相关源码,最后,可以快速看看 GoLang-Scheduling In Go 前两篇文章的中译版,这样可以较快掌握 GMP 调度器的设计思想。

当然,如果希望理解的更加透彻,还需要仔细钻研几遍源码,并学习其他各种资料,尤其是 Go 开发者的文章,最好能够输出一篇文章,以加深头脑中神经元的连接和对事情本质的理解,本文就是这一学习思路的结果,希望能帮助到感兴趣的同学。

本文的代码基于 Go1.18.1 版本,跟 Go1.14 版本的调度器的主要逻辑相比,依然没有大的变化,目前看到的改动是调度循环的 runtime.findrunnable() 函数,抽取了多个逻辑封装成了新的方法,比如 M 从 其他 P 上偷取 G 的 runtime.stealWork()。

0. 结论

先给出整篇文章的结论和大纲,便于大家获取重点:

  1. 为了解决 Go 早期多线程 M 对应多协程 G 调度器的全局锁、中心化状态带来的锁竞争导致的性能下降等问题Go 开发者引入了处理器 P 结构,形成了当前经典的 GMP 调度模型;
  2. Go 调度器是指:运行时在用户态提供的多个函数组成的一种机制,目的是高效地调度 G 到 M上去执行;
  3. Go 调度器的核心思想是:尽可能复用线程 M,避免频繁的线程创建和销毁;利用多核并行能力,限制同时运行(不包含阻塞)的 M 线程数 等于 CPU 的核心数目; Work Stealing 任务窃取机制,M 可以从其他 M 绑定的 P 的运行队列偷取 G 执行;Hand Off 交接机制,为了提高效率,M 阻塞时,会将 M 上 P 的运行队列交给其他 M 执行;基于协作的抢占机制,为了保证公平性和防止 Goroutine 饥饿问题,Go 程序会保证每个 G 运行 10ms 就让出 M,交给其他 G 去执行,这个 G 运行 10ms 就让出 M 的机制,是由单独的系统监控线程通过 retake() 函数给当前的 G 发送抢占信号实现的,如果所在的 P 没有陷入系统调用且没有满,让出的 G 优先进入本地 P 队列,否则进入全局队列;基于信号的真抢占机制,Go1.14 引入了基于信号的抢占式调度机制,解决了 GC 垃圾回收和栈扫描时无法被抢占的问题;
  4. 由于数据局部性,新创建的 G 优先放入本地队列,在本地队列满了时,会将本地队列的一半 G 和新创建的 G 打乱顺序,一起放入全局队列;本地队列如果一直没有满,也不用担心,全局队列的 G 永远会有 1/61 的机会被获取到,调度循环中,优先从本地队列获取 G 执行,不过每隔61次,就会直接从全局队列获取,至于为啥是 61 次,Dmitry 的视频讲解了,就是要一个既不大又不小的数,而且不能跟其他的常见的2的幂次方的数如 64 或 48 重合;
  5. M 优先执行其所绑定的 P 的本地运行队列中的 G,如果本地队列没有 G,则会从全局队列获取,为了提高效率和负载均衡,会从全局队列获取多个 G,而不是只取一个,个数是自己应该从全局队列中承担的,globrunqsize / nprocs + 1;同样,当全局队列没有时,会从其他 M 的 P 上偷取 G 来运行,偷取的个数通常是其他 P 运行队列的一半;
  6. G 在运行时中的状态可以简化成三种:等待中Gwaiting、可运行Grunnable、运行中_Grunning,运行期间大部分情况是在这三种状态间来回切换;
  7. M 的状态可以简化为只有两种:自旋和非自旋;自旋状态,表示 M 绑定了 P 又没有获取 G;非自旋状态,表示正在执行 Go 代码中,或正在进入系统调用,或空闲;
  8. P 结构体中最重要的,是持有一个可运行 G 的长度为 256 的本地环形队列,可以通过 CAS 的方式无锁访问,跟需要加锁访问的全局队列 schedt.runq 相对应;
  9. 调度器的启动逻辑是:初始化 g0 和 m0,并将二者互相绑定, m0 是程序启动后的初始线程,g0 是 m0 线程的系统栈代表的 G 结构体,负责普通 G 在 M 上的调度切换 --> runtime.schedinit():负责M、P 的初始化过程,分别调用runtime.mcommoninit() 初始化 M 的全局队列allm 、调用 runtime.procresize() 初始化全局 P 队列 allp --> runtime.newproc():负责获取空闲的 G 或创建新的 G --> runtime.mstart() 启动调度循环;;
  10. 调度器的循环逻辑是:运行函数 schedule() --> 通过 runtime.globrunqget() 从全局队列、通过 runtime.runqget() 从 P 本地队列、 runtime.findrunnable 从各个地方,获取一个可执行的 G --> 调用 runtime.execute() 执行 G --> 调用 runtime.gogo() 在汇编代码层面上真正执行G --> 调用 runtime.goexit0() 执行 G 的清理工作,重新将 G 加入 P 的空闲队列 --> 调用 runtime.schedule() 进入下一次调度循环。

1. GMP调度模型的设计思想

1.1 传统多线程的问题

在现代的操作系统中,为了提高并发处理任务的能力,一个 CPU 核上通常会运行多个线程,多个线程的创建、切换使用、销毁开销通常较大:

1)一个内核线程的大小通常达到1M,因为需要分配内存来存放用户栈和内核栈的数据;

2)在一个线程执行系统调用(发生 IO 事件如网络请求或读写文件)不占用 CPU 时,需要及时让出 CPU,交给其他线程执行,这时会发生线程之间的切换;

3)线程在 CPU 上进行切换时,需要保持当前线程的上下文,将待执行的线程的上下文恢复到寄存器中,还需要向操作系统内核申请资源;

在高并发的情况下,大量线程的创建、使用、切换、销毁会占用大量的内存,并浪费较多的 CPU 时间在非工作任务的执行上,导致程序并发处理事务的能力降低。

图1.1 传统多线程之间的切换开销较大
图1.1 传统多线程之间的切换开销较大

1.2 Go语言早期引入的 GM 模型

为了解决传统内核级的线程的创建、切换、销毁开销较大的问题,Go 语言将线程分为了两种类型:内核级线程 M (Machine),轻量级的用户态的协程 Goroutine,至此,Go 语言调度器的三个核心概念出现了两个:

M: Machine的缩写,代表了内核线程 OS Thread,CPU调度的基本单元;

G: Goroutine的缩写,用户态、轻量级的协程,一个 G 代表了对一段需要被执行的 Go 语言程序的封装;每个 Goroutine 都有自己独立的栈存放自己程序的运行状态;分配的栈大小 2KB,可以按需扩缩容;

图1.2 Go将线程拆分为内核线程 M 和用户线程 G
图1.2 Go将线程拆分为内核线程 M 和用户线程 G

在早期,Go 将传统线程拆分为了 M 和 G 之后,为了充分利用轻量级的 G 的低内存占用、低切换开销的优点,会在当前一个M上绑定多个 G,某个正在运行中的 G 执行完成后,Go 调度器会将该 G 切换走,将其他可以运行的 G 放入 M 上执行,这时一个 Go 程序中只有一个 M 线程:

图1.3 多个 G 对应一个 M
图1.3 多个 G 对应一个 M

这个方案的优点是用户态的 G 可以快速切换,不会陷入内核态,缺点是每个 Go 程序都用不了硬件的多核加速能力,并且 G 阻塞会导致跟 G 绑定的 M 阻塞,其他 G 也用不了 M 去执行自己的程序了。

为了解决这些不足,Go 后来快速上线了多线程调度器:

图1.4 多个 M 对应多个 G
图1.4 多个 M 对应多个 G

每个Go程序,都有多个 M 线程对应多个 G 协程,该方案有以下缺点:

1)全局锁、中心化状态带来的锁竞争导致的性能下降; 2)M 会频繁交接 G,导致额外开销、性能下降;每个 M 都得能执行任意的 runnable 状态的 G; 3)每个 M 都需要处理内存缓存,导致大量的内存占用并影响数据局部性; 4)系统调用频繁阻塞和解除阻塞正在运行的线程,增加了额外开销;

1.3 当前高效的 GMP 模型

为了解决多线程调度器的问题,Go 开发者 Dmitry Vyokov 在已有 G、M 的基础上,引入了 P 处理器,由此产生了当前 Go 中经典的 GMP 调度模型。

P:Processor的缩写,代表一个虚拟的处理器,它维护一个局部的可运行的 G 队列,可以通过 CAS 的方式无锁访问,工作线程 M 优先使用自己的局部运行队列中的 G,只有必要时才会去访问全局运行队列,这大大减少了锁冲突,提高了大量 G 的并发性。每个 G 要想真正运行起来,首先需要被分配一个 P。

如图 1.5 所示,是当前 Go 采用的 GMP 调度模型。可运行的 G 是通过处理器 P 和线程 M 绑定起来的,M 的执行是由操作系统调度器将 M 分配到 CPU 上实现的,Go 运行时调度器负责调度 G 到 M 上执行,主要在用户态运行,跟操作系统调度器在内核态运行相对应。

图1.5 当前高效的GMP调度模型
图1.5 当前高效的GMP调度模型

需要说明的是,Go 调度器也叫 Go 运行时调度器,或 Goroutine 调度器,指的是由运行时在用户态提供的多个函数组成的一种机制,目的是为了高效地调度 G 到 M上去执行。可以跟操作系统的调度器 OS Scheduler 对比来看,后者负责将 M 调度到 CPU 上运行。从操作系统层面来看,运行在用户态的 Go 程序只是一个请求和运行多个线程 M 的普通进程,操作系统不会直接跟上层的 G 打交道。

至于为什么不直接将本地队列放在 M 上、而是要放在 P 上呢? 这是因为当一个线程 M 阻塞(可能执行系统调用或 IO请求)的时候,可以将和它绑定的 P 上的 G 转移到其他线程 M 去执行,如果直接把可运行 G 组成的本地队列绑定到 M,则万一当前 M 阻塞,它拥有的 G 就不能给到其他 M 去执行了。

基于 GMP 模型的 Go 调度器的核心思想是:

  1. 尽可能复用线程 M:避免频繁的线程创建和销毁;
  2. 利用多核并行能力:限制同时运行(不包含阻塞)的 M 线程数为 N,N 等于 CPU 的核心数目,这里通过设置 P 处理器的个数为 GOMAXPROCS 来保证,GOMAXPROCS 一般为 CPU 核数,因为 M 和 P 是一一绑定的,没有找到 P 的 M 会放入空闲 M 列表,没有找到 M 的 P 也会放入空闲 P 列表;
  3. Work Stealing 任务窃取机制:M 优先执行其所绑定的 P 的本地队列的 G,如果本地队列为空,可以从全局队列获取 G 运行,也可以从其他 M 偷取 G 来运行;为了提高并发执行的效率,M 可以从其他 M 绑定的 P 的运行队列偷取 G 执行,这种 GMP 调度模型也叫任务窃取调度模型,这里,任务就是指 G;
  4. Hand Off 交接机制:M 阻塞,会将 M 上 P 的运行队列交给其他 M 执行,交接效率要高,才能提高 Go 程序整体的并发度;
  5. 基于协作的抢占机制:每个真正运行的G,如果不被打断,将会一直运行下去,为了保证公平,防止新创建的 G 一直获取不到 M 执行造成饥饿问题,Go 程序会保证每个 G 运行10ms 就要让出 M,交给其他 G 去执行;
  6. 基于信号的真抢占机制:尽管基于协作的抢占机制能够缓解长时间 GC 导致整个程序无法工作和大多数 Goroutine 饥饿问题,但是还是有部分情况下,Go调度器有无法被抢占的情况,例如,for 循环或者垃圾回收长时间占用线程,为了解决这些问题, Go1.14 引入了基于信号的抢占式调度机制,能够解决 GC 垃圾回收和栈扫描时存在的问题。

2. 多图详解几种常见的调度场景

在进入 GMP 调度模型的数据结构和源码之前,可以先用几张图形象的描述下 GMP 调度机制的一些场景,帮助理解 GMP 调度器为了保证公平性、可扩展性、及提高并发效率,所设计的一些机制和策略。

1)创建 G: 正在 M1 上运行的P,有一个G1,通过go func() 创建 G2 后,由于局部性,G2优先放入P的本地队列;

图2.1 正在M1上运行的G1通过go func() 创建 G2 后,由于局部性,G2优先放入P的本地队列
图2.1 正在M1上运行的G1通过go func() 创建 G2 后,由于局部性,G2优先放入P的本地队列

2)G 运行完成后:M1 上的 G1 运行完成后(调用goexit()函数),M1 上运行的 Goroutine 会切换为 G0,G0 负责调度协程的切换(运行schedule() 函数),从 M1 上 P 的本地运行队列获取 G2 去执行(函数execute());注意:这里 G0 是程序启动时的线程 M(也叫M0)的系统栈表示的 G 结构体,负责 M 上 G 的调度;

图2.2 M1 上的 G1 运行完会切换到 P 本地队列的 G2 运行
图2.2 M1 上的 G1 运行完会切换到 P 本地队列的 G2 运行

3)M 上创建的 G 个数大于本地队列长度时:如果 P 本地队列最多能存 4 个G(实际上是256个),正在 M1 上运行的 G2 要通过go func()创建 6 个G,那么,前 4 个G 放在 P 本地队列中,G2 创建了第 5 个 G(G7)时,P 本地队列中前一半和 G7 一起打乱顺序放入全局队列,P 本地队列剩下的 G 往前移动,G2 创建的第 6 个 G(G8)时,放入 P 本地队列中,因为还有空间;

图2.3 M1上的G2要创建的G个数多于P本地队列能够存放的G个数时
图2.3 M1上的G2要创建的G个数多于P本地队列能够存放的G个数时

4)M 的自旋状态:创建新的 G 时,运行的 G 会尝试唤醒其他空闲的 M 绑定 P 去执行,如果 G2 唤醒了M2,M2 绑定了一个 P2,会先运行 M2 的 G0,这时 M2 没有从 P2 的本地队列中找到 G,会进入自旋状态(spinning),自旋状态的 M2 会尝试从全局空闲线程队列里面获取 G,放到 P2 本地队列去执行,获取的数量满足公式:n = min(len(globrunqsize)/GOMAXPROCS + 1, len(localrunsize/2)),含义是每个P应该从全局队列承担的 G 数量,为了提高效率,不能太多,要给其他 P 留点;

图2.4 创建新的 G 时,运行的G会尝试唤醒其他空闲的M绑定P去执行
图2.4 创建新的 G 时,运行的G会尝试唤醒其他空闲的M绑定P去执行

5)任务窃取机制:自旋状态的 M 会寻找可运行的 G,如果全局队列为空,则会从其他 P 偷取 G 来执行,个数是其他 P 运行队列的一半;

图2.5 自旋状态的 M 会寻找可运行的 G,如果全局队列为空,则会从其他 P 偷取 G 来执行
图2.5 自旋状态的 M 会寻找可运行的 G,如果全局队列为空,则会从其他 P 偷取 G 来执行

6)G 发生系统调用时:如果 G 发生系统调度进入阻塞,其所在的 M 也会阻塞,因为会进入内核状态等待系统资源,和 M 绑定的 P 会寻找空闲的 M 执行,这是为了提高效率,不能让 P 本地队列的 G 因所在 M 进入阻塞状态而无法执行;需要说明的是,如果是 M 上的 G 进入 Channel 阻塞,则该 M 不会一起进入阻塞,因为 Channel 数据传输涉及内存拷贝,不涉及系统资源等待;

图2.6 如果 G 发生阻塞,其所在的 M 也会阻塞,和 M 绑定的 P 会寻找空闲的 M 执行
图2.6 如果 G 发生阻塞,其所在的 M 也会阻塞,和 M 绑定的 P 会寻找空闲的 M 执行

7)G 退出系统调用时:如果刚才进入系统调用的 G2 解除了阻塞,其所在的 M1 会寻找 P 去执行,优先找原来的 P,发现没有找到,则其上的 G2 会进入全局队列,等其他 M 获取执行,M1 进入空闲队列;

图 2.7 当 G 解除阻塞时,所在的 M会寻找 P 去执行,如果没有找到,则 G 进入全局队列,M 进入空闲队列
图 2.7 当 G 解除阻塞时,所在的 M会寻找 P 去执行,如果没有找到,则 G 进入全局队列,M 进入空闲队列

3. GMP的数据结构和各种状态

3.1 G 的数据结构和状态

G 的数据结构是:

代码语言:javascript
复制
// src/runtime/runtime2.go
type g struct {
  stack       stack       // 描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi)
  stackguard0 uintptr     // 用于调度器抢占式调度
  _panic      *_panic     // 最内侧的 panic 结构体
  _defer      *_defer     // 最内侧的 defer 延迟函数结构体
  m           *m          // 当前 G 占用的线程,可能为空
  sched       gobuf       //  存储 G 的调度相关的数据
  atomicstatus uint32     // G 的状态
  goid         int64      //  G 的 ID
  waitreason   waitReason //当状态status==Gwaiting时等待的原因
  preempt       bool      // 抢占信号
  preemptStop   bool      // 抢占时将状态修改成 `_Gpreempted`
  preemptShrink bool      // 在同步安全点收缩栈
  lockedm        muintptr   //G 被锁定只能在这个 m 上运行
  waiting        *sudog     // 这个 g 当前正在阻塞的 sudog 结构体
  ......
}

G 的主要字段有:

stack:描述了当前 Goroutine 的栈内存范围 [stack.lo, stack.hi);

stackguard0: 可以用于调度器抢占式调度;preempt,preemptStop,preemptShrink跟抢占相关;

defer 和 panic:分别记录这个 G 最内侧的panic和 _defer结构体;

m:记录当前 G 占用的线程 M,可能为空;

atomicstatus:表示G 的状态;

sched:存储 G 的调度相关的数据;

goid:表示 G 的 ID,对开发者不可见;

需要展开描述的是sched 字段的 runtime.gobuf 结构体:

代码语言:javascript
复制
type gobuf struct {
  sp   uintptr      // 栈指针
  pc   uintptr      // 程序计数器,记录G要执行的下一条指令位置
  g    guintptr     // 持有 runtime.gobuf 的 G
  ret  uintptr      // 系统调用的返回值
  ......
}

这些字段会在调度器将当前 G 切换离开 M 和调度进入 M 执行程序时用到,栈指针 sp 和程序计数器 pc 用来存放或恢复寄存器中的值,改变程序执行的指令。

结构体 runtime.g 的 atomicstatus 字段存储了当前 G 的状态,G 可能处于以下状态:

代码语言:javascript
复制
const (
  // _Gidle 表示 G 刚刚被分配并且还没有被初始化
  _Gidle = iota // 0
  // _Grunnable 表示 G  没有执行代码,没有栈的所有权,存储在运行队列中
  _Grunnable // 1
  // _Grunning 可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P
  _Grunning // 2
  // _Gsyscall 正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上
  _Gsyscall // 3
  // _Gwaiting 由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上
  _Gwaiting // 4
  // _Gdead 没有被使用,没有执行代码,可能有分配的栈
  _Gdead // 6
  // _Gcopystack 栈正在被拷贝,没有执行代码,不在运行队列上
  _Gcopystack // 8
  // _Gpreempted 由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒
  _Gpreempted // 9
  // _Gscan GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在
  _Gscan          = 0x1000
  ......
)

其中主要的六种状态是:

Gidle:G 被创建但还未完全被初始化;

Grunnable:当前 G 为可运行的,正在等待被运行;

Grunning:当前 G 正在被运行;

Gsyscall:当前 G 正在被系统调用;

Gwaiting:当前 G 正在因某个原因而等待;

Gdead:当前 G 完成了运行;

图3.1描述了G从创建到结束的生命周期中经历的各种状态变化过程:

图3.1 G的状态变化
图3.1 G的状态变化

虽然 G 在运行时中定义的状态较多且复杂,但是我们可以将这些不同的状态聚合成三种:等待中、可运行、运行中,分别由Gwaiting、Grunnable、_Grunning 三种状态表示,运行期间大部分情况是在这三种状态来回切换:

等待中:G 正在等待某些条件满足,例如:系统调用结束等,包括 Gwaiting、Gsyscall 几个状态; ​ 可运行:G 已经准备就绪,可以在线程 M 上运行,如果当前程序中有非常多的 G,每个 G 就可能会等待更多的时间,即 _Grunnable; ​ 运行中:G 正在某个线程 M 上运行,即 _Grunning。

3.2 M 的数据结构

M 的数据结构是:

代码语言:javascript
复制
// src/runtime/runtime2.gotype m struct {
  g0            *g          // 持有调度栈的 G
  gsignal       *g                // 处理 signal 的 g
  tls           [tlsSlots]uintptr // 线程本地存储        mstartfn      func()      // M的起始函数,go语句携带的那个函数
  curg          *g          // 在当前线程上运行的 G
  p             puintptr    // 执行 go 代码时持有的 p (如果没有执行则为 nil)
  nextp         puintptr    // 用于暂存与当前 M 有潜在关联的 P
  oldp          puintptr    // 执行系统调用前绑定的 P
  spinning      bool        // 表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态
  lockedg       guintptr    // 表示与当前 M 锁定的那个 G
  .....
}

M 的字段众多,其中最重要的为下面几个:

g0: Go 运行时系统在启动之初创建的,用来调度其他 G 到 M 上;

mstartfn:表示M的起始函数,其实就是go 语句携带的那个函数;

curg:存放当前正在运行的 G 的指针;

p:指向当前与 M 关联的那个 P;

nextp:用于暂存于当前 M 有潜在关联的 P;

spinning:表示当前 M 是否正在寻找 G,在寻找过程中 M 处于自旋状态;

lockedg:表示与当前M锁定的那个 G,运行时系统会把 一个 M 和一个 G 锁定,一旦锁定就只能双方相互作用,不接受第三者;

M 并没有像 G 和 P 一样的状态标记, 但可以认为一个 M 有以下的状态:

自旋中(spinning): M 正在从运行队列获取 G, 这时候 M 会拥有一个 P;

执行go代码中: M 正在执行go代码, 这时候 M 会拥有一个 P;

执行原生代码中: M 正在执行原生代码或者阻塞的syscall, 这时M并不拥有P;

休眠中: M 发现无待运行的 G 时会进入休眠, 并添加到空闲 M 链表中, 这时 M 并不拥有 P。

3.3 P 的数据结构

P 的数据结构是:

代码语言:javascript
复制
// src/runtime/runtime2.gotype p struct {
	status      uint32      // p 的状态 pidle/prunning/...
	schedtick   uint32      // 每次执行调度器调度 +1
	syscalltick uint32      // 每次执行系统调用 +1
	m           muintptr    // 关联的 m 
	mcache      *mcache     // 用于 P 所在的线程 M 的内存分配的 mcache
	deferpool    []*_defer  // 本地 P 队列的 defer 结构体池
	// 可运行的 Goroutine 队列,可无锁访问
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	// 线程下一个需要执行的 G
	runnext guintptr
	// 空闲的 G 队列,G 状态 status 为 _Gdead,可重新初始化使用
	gFree struct {
		gList
		n int32
	}
        ......
}

最主要的数据结构是 status 表示 P 的不同的状态,而 runqhead、runqtail 和 runq 三个字段表示处理器持有的运行队列,是一个长度为256的环形队列,其中存储着待执行的 G 列表,runnext 中是线程下一个需要执行的 G;gFree 存储 P 本地的状态为_Gdead 的空闲的 G,可重新初始化使用。

P 结构体中的状态 status 字段会是以下五种中的一种:

_Pidle:P 没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空;

_Prunning:被线程 M 持有,并且正在执行用户代码或者调度器;

_Psyscall:没有执行用户代码,当前线程陷入系统调用;

_Pgcstop:被线程 M 持有,当前处理器由于垃圾回收被停止;

_Pdead:当前 P 已经不被使用;

P 的五种状态之间的转化关系如图 3.2 所示:

图3.2 P的状态变化
图3.2 P的状态变化

3.4 schedt 的数据结构

调度器的schedt结构体存储了全局的 G 队列,空闲的 M 列表和 P 列表:

代码语言:javascript
复制
// src/runtime/runtime2.gotype schedt struct {
  lock mutex            // schedt的锁
  midle        muintptr // 空闲的M列表
  nmidle       int32    // 空闲的M列表的数量
  nmidlelocked int32    // 被锁定正在工作的M数
  mnext        int64    // 下一个被创建的 M 的 ID
  maxmcount    int32    // 能拥有的最大数量的 M
  pidle      puintptr   // 空闲的 P 链表
  npidle     uint32     // 空闲 P 数量
  nmspinning uint32     // 处于自旋状态的 M 的数量
  // 全局可执行的 G 列表
  runq     gQueue
  runqsize int32        // 全局可执行 G 列表的大小
  // 全局 _Gdead 状态的空闲 G 列表
  gFree struct {
    lock    mutex
    stack   gList // Gs with stacks
    noStack gList // Gs without stacks
    n       int32
  }
  // sudog结构的集中存储
  sudoglock  mutex
  sudogcache *sudog
  // 有效的 defer 结构池
  deferlock mutex
  deferpool *_defer
        ......
}

除了上面的四个结构体,还有一些全局变量:

代码语言:javascript
复制
// src/runtime/runtime2.go
var (
  allm       *m         // 所有的 M
  gomaxprocs int32      // P 的个数,默认为 ncpu 核数
  ncpu       int32
  ......
  sched      schedt     // schedt 全局结构体
  newprocs   int32
​
  allpLock mutex       // 全局 P 队列的锁
  allp []*p            // 全局 P 队列,个数为 gomaxprocs
        ......
}

此外,src/runtime/proc.go 文件有两个全局变量:

代码语言:javascript
复制
// src/runtime/proc.go var (
  m0           m       //  进程启动后的初始线程
  g0            g      //  代表着初始线程的stack
  ......
)

到这里,G、M、P、schedt结构体和全局变量都描述完毕,GMP 的全部队列如下表3-1所示:

表3-1 GMP的队列

中文名

源码的名称

作用域

简要说明

全局M列表

runtime.allm

运行时系统

存放所有M

全局P列表

runtime.allp

运行时系统

存放所有P

调度器中的空闲M列表

runtime.schedt.midle

调度器

存放空闲M

调度器中的空闲P列表

runtime.schedt.pidle

调度器

存放空闲P

调度器中的可运行G队列

runtime.schedt.runq

调度器

存放可运行G

P的本地可运行G队列

runtime.p.runq

本地P

存放当前P中的可运行G

调度器中的空闲G列表

runtime.schedt.gfree

调度器

存放空闲的G

P中的空闲G列表

runtime.p.gfree

本地P

存放当前P中的空闲G

4. 调度器的启动

4.1 程序启动流程

Go 程序一启动,Go 的运行时 runtime 自带的调度器 scheduler 就开始启动了。

对于一个最简单的Go程序:

代码语言:javascript
复制
package main

import "fmt"

func main() {
	fmt.Println("hello world")
}

通过 gdb或dlv的方式调试,会发现程序的真正入口不是在 runtime.main,对 AMD64 架构上的 Linux 和 macOS 服务器来说,分别在runtime包的 src/runtime/rt0_linux_amd64.s 和 src/runtime/rt0_darwin_amd64.s:

代码语言:javascript
复制
TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
	JMP	_rt0_amd64(SB)
TEXT _rt0_amd64_darwin(SB),NOSPLIT,$-8
	JMP	_rt0_amd64(SB)

两者均跳转到了 src/runtime/asm_amd64.s 包的 _rt0_amd64 函数:

代码语言:javascript
复制
TEXT _rt0_amd64(SB),NOSPLIT,$-8
	MOVQ	0(SP), DI	// argc
	LEAQ	8(SP), SI	// argv
	JMP	runtime·rt0_go(SB)

_rt0_amd64 函数调用了 src/runtime/asm_arm64.s 包的 runtime·rt0_go 函数:

代码语言:javascript
复制
TEXT runtime·rt0_go(SB),NOSPLIT|TOPFRAME,$0
	......
	// 初始化g0
	MOVD	$runtime·g0(SB), g
        ......
	// 初始化 m0
	MOVD	$runtime·m0(SB), R0// 绑定 g0 和 m0
	MOVD	g, m_g0(R0)
	MOVD	R0, g_m(g)
        ......
	BL	runtime·schedinit(SB)      // 调度器初始化

	// 创建一个新的 goroutine 来启动程序
	MOVD	$runtime·mainPC(SB), R0	   // main函数入口	
	.......
        BL	runtime·newproc(SB)        // 负责根据主函数即 main 的入口地址创建可被运行时调度的执行单元goroutine
	.......

	// 开始启动调度器的调度循环
	BL	runtime·mstart(SB)
	......

DATA	runtime·mainPC+0(SB)/8,$runtime·main<ABIInternal>(SB)    // main函数入口地址
GLOBL	runtime·mainPC(SB),RODATA,$8

Go程序的真正启动函数 runtime·rt0_go 主要做了几件事:

1)初始化 g0 和 m0,并将二者互相绑定, m0 是程序启动后的初始线程,g0 是 m0 的系统栈代表的 G 结构体,负责普通 G 在M 上的调度切换;

2)schedinit:进行各种运行时组件初始化工作,这包括我们的调度器与内存分配器、回收器的初始化;

3)newproc:负责根据主函数即 main 的入口地址创建可被运行时调度的执行单元;

4)mstart:开始启动调度器的调度循环;

阅读 Go 调度器的源码,需要先从整体结构上对其有个把握,Go 程序启动后的调度器主逻辑如图 4.1 所示:

图4.1 调度器主逻辑
图4.1 调度器主逻辑

下面分为两部分来分析调度器的原理:调度器的启动和调度循环。

4.2 调度器的启动

调度器启动函数在 src/runtime/proc.go 包的 schedinit() 函数:

代码语言:javascript
复制
// src/runtime/proc.go
// 调度器初始化
func schedinit() {
	......
	_g_ := getg()   
	......        // 设置机器线程数M最大为10000
	sched.maxmcount = 10000
        ......
	// 栈、内存分配器相关初始化
	stackinit()          // 初始化栈
	mallocinit()         // 初始化内存分配器
	......
	// 初始化当前系统线程 M0
	mcommoninit(_g_.m, -1)
	......
        // GC初始化
	gcinit()
        ......
        // 设置P的值为GOMAXPROCS个数
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
        // 调用procresize调整 P 列表
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
	......
}

schedinit() 函数会设置 M 最大数量为10000,实际中不会达到;会分别调用stackinit() 、mallocinit() 、mcommoninit() 、gcinit() 等执行 goroutine栈初始化、进行内存分配器初始化、进行系统线程M0的初始化、进行GC垃圾回收器的初始化;接着,将 P 个数设置为 GOMAXPROCS 的值,即程序能够同时运行的最大处理器数,最后会调用 runtime.procresize()函数初始化 P 列表。

图4.2 runtime.schedinit() 函数逻辑
图4.2 runtime.schedinit() 函数逻辑

schedinit() 函数负责M、P、G 的初始化过程。M/P/G 彼此的初始化顺序遵循:mcommoninit、procresize、newproc,他们分别负责初始化 M 资源池(allm)、P 资源池(allp)、G 的运行现场(g.sched)以及调度队列(p.runq)。

mcommoninit() 函数主要负责对 M0 进行一个初步的初始化,并将其添加到 schedt 全局结构体中,这里访问 schedt 会加锁:

代码语言:javascript
复制
// src/runtime/proc.gofunc mcommoninit(mp *m, id int64) {
        ......
  lock(&sched.lock)
​
  if id >= 0 {
    mp.id = id
  } else { // mReserveID() 会返回 sched.mnext 给当前 m,并对 sched.mnext++,记录新增加的这个 M 到 schedt 全局结构体
    mp.id = mReserveID()
  }
​
  ......
​
  // 添加到 allm 中
  mp.alllink = allm
​
  // 等价于 allm = mp
  atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
  unlock(&sched.lock)
        ......
}

runtime.procresize()函数的逻辑是:

代码语言:javascript
复制
// src/runtime/proc.go
func procresize(nprocs int32) *p {
  ......
        // 获取先前的 P 个数
  old := gomaxprocs
  ......
  // 如果全局变量 allp 切片中的处理器数量少于期望数量,对 allp 扩容
  if nprocs > int32(len(allp)) {
    // 加锁
    lock(&allpLock)
    if nprocs <= int32(cap(allp)) { // 如果要达到的 P 个数 nprocs 小于当前全局 P 切片到容量
      allp = allp[:nprocs]    // 在当前全局 P 切片上截取前 nprocs 个 P
    } else {                        // 否则,调大了,超出全局 P 切片的容量,创建容量为 nprocs 的新的 P 切片
      nallp := make([]*p, nprocs)
      // 将原有的 p 复制到新创建的 nallp 中
      copy(nallp, allp[:cap(allp)])
      allp = nallp  // 新的 nallp 切片赋值给旧的 allp
    }
                ......
    unlock(&allpLock)
  }
​
  // 使用 new 创建新的 P 结构体并调用 runtime.p.init 初始化刚刚扩容的allp列表里的 P
  for i := old; i < nprocs; i++ {
    pp := allp[i]                // 如果 p 是新创建的(新创建的 p 在数组中为 nil),则申请新的 P 对象
    if pp == nil {
      pp = new(p)
    }
    pp.init(i)
    atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
  }
​
  _g_ := getg()
        // 当前 G 的 M 上的 P 不为空,并且其 id 小于 nprocs,说明 ID 有效,则可以继续使用当前 G 的 P
  if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
    // 继续使用当前 P, 其状态设置为 _Prunning
    _g_.m.p.ptr().status = _Prunning
    _g_.m.p.ptr().mcache.prepareForSweep()
  } else {
    // 否则,释放当前 P 并获取 allp[0]
    if _g_.m.p != 0 {
      ......
      _g_.m.p.ptr().m = 0
    }
    _g_.m.p = 0
                // 将处理器 allp[0] 绑定到当前 M
    p := allp[0]
    p.m = 0
    p.status = _Pidle  // P 状态设置为 _Pidle 
    acquirep(p)        // 将allp[0]绑定到当前的 M
    if trace.enabled {
      traceGoStart()
    }
  }
​
  
  mcache0 = nil
​
  // 调用 runtime.p.destroy 释放从未使用的 P
  for i := nprocs; i < old; i++ {
    p := allp[i]
    p.destroy()
    // 不能释放 p 本身,因为他可能在 m 进入系统调用时被引用
  }
​
  // 裁剪 allp,保证allp长度与期望处理器数量相等
  if int32(len(allp)) != nprocs {
    lock(&allpLock)
    allp = allp[:nprocs]
    idlepMask = idlepMask[:maskWords]
    timerpMask = timerpMask[:maskWords]
    unlock(&allpLock)
  }
​
  var runnablePs *p
        // 将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局的空闲队列中
  for i := nprocs - 1; i >= 0; i-- {
    p := allp[i]
    if _g_.m.p.ptr() == p {    // 跳过当前 P
      continue
    }
    p.status = _Pidle          // 设置 P 的状态为空闲状态
    if runqempty(p) {
      pidleput(p)        // 放入到全局结构体 schedt 的空闲 P 列表中
    } else {
      p.m.set(mget())    // 如果有本地任务,则为其绑定一个 M
      p.link.set(runnablePs)
      runnablePs = p
    }
  }
  stealOrder.reset(uint32(nprocs))
  var int32p *int32 = &gomaxprocs 
  atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
  return runnablePs      // 返回所有包含本地任务的 P 链表
}

runtime.procresize() 函数 的执行过程如下:

1)如果全局变量 allp 切片中的 P 数量少于期望数量,会对切片进行扩容;

2)使用 new 创建新的 P 结构体并调用 runtime.p.init 初始化刚刚扩容的 P;

3)通过指针将线程 m0 和处理器 allp[0] 绑定到一起;

4)调用 runtime.p.destroy 释放不再使用的 P 结构;

5)通过切片截断改变全局变量 allp 的长度,保证它与期望 P 数量相等;

6)将除 allp[0] 之外的处理器 P 全部设置成 _Pidle 并加入到全局 schedt 的空闲 P 队列中;

runtime.procresize() 函数的逻辑如图 4.3 所示:

图4.3 runtime.procresize() 函数逻辑
图4.3 runtime.procresize() 函数逻辑

runtime.procresize() 函数调用 runtime.p.init 初始化新创建的 P:

代码语言:javascript
复制
// src/runtime/proc.go
// 初始化 P
func (pp *p) init(id int32) {
	pp.id = id             // p 的 id 就是它在 allp 中的索引
	pp.status = _Pgcstop   // 新创建的 p 处于 _Pgcstop 状态
	......
        // 为 P 分配 cache 对象,涉及对象分配
	if pp.mcache == nil {
		if id == 0 {
			if mcache0 == nil {
				throw("missing mcache?")
			}
			pp.mcache = mcache0
		} else {
			pp.mcache = allocmcache()
		}
	}
	......
}

需要说明的是,mcache内存结构原来是在 M 上的,自从引入了 P 之后,就将该结构体移到了P上,这样,就不用每个 M 维护自己的内存分配 mcache,由于 P 在有 M 可以执行时才会移动到其他 M 上去,空闲的 M 无须分配内存,这种灵活性使整体线程的内存分配大大减少。

4.3 怎样创建 G ?

我们再回到 4.1 节对程序启动函数 runtime·rt0_go,有个动作是通过 runtime.newproc 函数创建 G,runtime.newproc 入参是 funcval 结构体函数,代表 go 关键字后面调用的函数:

代码语言:javascript
复制
// src/runtime/proc.go
// 创建G,并放入 P 的运行队列
func newproc(fn *funcval) {
  gp := getg()
  pc := getcallerpc()    // 获取调用方 PC 寄存器值,即调用方程序要执行的下一个指令地址        // 用 g0 系统栈创建 Goroutine 对象        // 传递的参数包括 fn 函数入口地址, gp(g0),调用方 pc
  systemstack(func() {
    newg := newproc1(fn, gp, pc)  // 调用 newproc1 获取 Goroutine 结构
​
    _p_ := getg().m.p.ptr()       // 获取当前 G 的 P 
    runqput(_p_, newg, true)      // 将新的 G 放入 P 的本地运行队列
​
    if mainStarted {              // M 启动时唤醒新的 P 执行 G
      wakep()
    }
  })
}

runtime.newproce函数主要是调用 runtime.newproc1 获取新的 Goroutine 结构,将新的 G 放入P的运行队列,M 启动时唤醒新的 P 执行 G。

runtime.newproce函数的逻辑如图4.4所示:

图4.4 runtime.newproce() 函数逻辑
图4.4 runtime.newproce() 函数逻辑

runtime.newproce1() 函数的逻辑是:

代码语言:javascript
复制
// src/runtime/proc.go
// 创建一个运行fn函数的goroutine
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
  _g_ := getg()       // 因为是在系统栈运行所以此时的 g 为 g0
​
  if fn == nil {
    _g_.m.throwing = -1 // do not dump full stacks
    throw("go of nil func value")
  }
  acquirem() // 加锁,禁止这时 G 的 M 被抢占,因为它可以在一个局部变量中保存 P
​
  _p_ := _g_.m.p.ptr()         // 获取 P
  newg := gfget(_p_)           // 从 P 的空闲列表获取一个空闲的 G
  if newg == nil {             // 找不到则创建
    newg = malg(_StackMin)     // 创建一个栈大小为 2K 的 G
    casgstatus(newg, _Gidle, _Gdead)     // CAS 改变 G 的状态为_Gdead
    allgadd(newg) // 将 _Gdead 状态的 g 添加到 allg,这样 GC 不会扫描未初始化的栈
  }
  ......
        // 计算运行空间大小,对齐
  totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) 
  totalSize = alignUp(totalSize, sys.StackAlign)
  sp := newg.stack.hi - totalSize       // 确定 SP 和参数入栈位置
  spArg := sp
  ......
        // 清理、创建并初始化 G 的运行现场
  memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
  newg.sched.sp = sp
  newg.stktopsp = sp// 保存goexit的地址到sched.pc
  newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
  newg.sched.g = guintptr(unsafe.Pointer(newg))
  gostartcallfn(&newg.sched, fn)
        // 初始化 G 的基本状态
  newg.gopc = callerpc
  newg.ancestors = saveAncestors(callergp)
  newg.startpc = fn.fn
  ......
        // 将 G 的状态设置为_Grunnable
  casgstatus(newg, _Gdead, _Grunnable)
  ......
        // 生成唯一的goid
  newg.goid = int64(_p_.goidcache)
  _p_.goidcache++
  ......
        // 释放对 M 加的锁
  releasem(_g_.m)
​
  return newg
}

runtime.newproc1() 函数主要执行三个动作: 1)获取或者创建新的 Goroutine 结构体,会先从处理器的 gFree 列表中查找空闲的 Goroutine,如果不存在空闲的 Goroutine,会通过 runtime.malg 创建一个栈大小足够的新结构体,新创建的 G 的状态为_Gdead; 2)将传入的参数 callergp,callerpc,fn更新到 G 的栈上,初始化 G 的相关参数; 3)将 G 状态设置为 _Grunnable 状态,返回;

runtime.newproc1() 函数的逻辑如图 4.5 所示:

图4.5 runtime.newproc1() 函数逻辑
图4.5 runtime.newproc1() 函数逻辑

runtime.newproc1() 函数主要调用 runtime.gfget() 函数 获取 G:

代码语言:javascript
复制
// src/runtime/proc.go
func gfget(_p_ *p) *g {
retry:
        // 如果 P 的空闲列表 gFree 为空,sched 的的空闲列表 gFree 不为空
  if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
    lock(&sched.gFree.lock)
    // 从 sched 的 gFree 列表中移动 32 个 G 到 P 的 gFree 中
    for _p_.gFree.n < 32 {
      gp := sched.gFree.stack.pop()
      if gp == nil {
        gp = sched.gFree.noStack.pop()
        if gp == nil {
          break
        }
      }
      sched.gFree.n--
      _p_.gFree.push(gp)
      _p_.gFree.n++
    }
    unlock(&sched.gFree.lock)
    goto retry
  }
        // 如果此时 P 的空闲列表还是为空,返回nil,说明无空闲的G
  gp := _p_.gFree.pop()
  if gp == nil {
    return nil
  }
  _p_.gFree.n--
        // 设置 G 的栈空间
  if gp.stack.lo == 0 {
    systemstack(func() {
      gp.stack = stackalloc(_FixedStack)
    })
    gp.stackguard0 = gp.stack.lo + _StackGuard
  } else {
    .....
  }
  return gp   // 从 P 的空闲列表获取 G 返回
}

runtime.gfget() 函数的主要逻辑是:当 P 的空闲列表 gFree 为空时,从 sched 持有的全局空闲列表 gFree 中移动最多 32个 G 到当前的 P 的空闲列表上;然后从 P 的 gFree 列表头返回一个 G;如果还是没有,则返回空,说明获取不到空闲的 G。

在 runtime.newproc1() 函数中,如果不存在空闲的 G,会通过 runtime.malg() 创建一个栈大小足够的新结构体:

代码语言:javascript
复制
// src/runtime/proc.go
// 创建一个新的 g 结构体 
func malg(stacksize int32) *g {
  newg := new(g)
  if stacksize >= 0 {     // 如果申请的堆栈大小大于 0,会通过 runtime.stackalloc 分配 2KB 的栈空间
    stacksize = round2(_StackSystem + stacksize)
    systemstack(func() {
      newg.stack = stackalloc(uint32(stacksize))
    })
    newg.stackguard0 = newg.stack.lo + _StackGuard
    newg.stackguard1 = ^uintptr(0)
    *(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
  }
  return newg
}

回到 runtime.newproce函数,在获取到 G 后,会调用 runtime.runqput() 函数将 G 放入 P 本地队列,或全局队列:

代码语言:javascript
复制
// src/runtime/proc.go
// 将 G 放入 P 的运行队列中
func runqput(_p_ *p, gp *g, next bool) {        // 保持一定的随机性,不将当前 G 设置为 P 的下一个执行的任务
  if randomizeScheduler && next && fastrandn(2) == 0 {
    next = false
  }
​
  if next {
  retryNext:
               // 将 G 放入到 P 的 runnext 变量中,作为下一个 P 执行的任务
    oldnext := _p_.runnext
    if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
      goto retryNext
    }
    if oldnext == 0 {
      return
    }
    // 获取原来的 runnext 存储的 G,放入 P 本地运行队列,或全局队列
    gp = oldnext.ptr()
  }
​
retry:
  h := atomic.LoadAcq(&_p_.runqhead) // 获取 P 环形队列的头和尾部指针
  t := _p_.runqtail
        // P 本地环形队列没有满,将 G 放入本地环形队列
  if t-h < uint32(len(_p_.runq)) {
    _p_.runq[t%uint32(len(_p_.runq))].set(gp)
    atomic.StoreRel(&_p_.runqtail, t+1) 
    return
  }
        // P 本地环形队列已满,将 G 放入全局队列
  if runqputslow(_p_, gp, h, t) {
    return
  }
  // 本地队列和全局队列没有满,则不会走到这里,否则循环尝试放入
  goto retry
}

runtime.runqput() 函数的主要处理逻辑是:

1)保留一定的随机性,设置 next 为 false,即不将当前 G 设置为 P 的下一个执行的 G;

2)当 next 为 true 时,将 G 设置到 P 的 runnext 作为 P 下一个执行的任务;

3)当 next 为 false 并且本地运行队列还有剩余空间时,将 Goroutine 加入处理器持有的本地运行队列;

4)当处理器的本地运行队列已经没有剩余空间时,就会把本地队列中的一部分 G 和待加入的 G 通过 runtime.runqputslow 添加到调度器持有的全局运行队列上;

runtime.runqput() 函数的逻辑如图 4.6 所示:

图4.6 runtime.runqput() 函数的逻辑
图4.6 runtime.runqput() 函数的逻辑

runtime.runqputslow() 函数的逻辑如下:

代码语言:javascript
复制
// 将 G 和 P 本地队列的一部分放入全局队列
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
  var batch [len(_p_.runq)/2 + 1]*g   // 初始化一个本地队列长度一半 + 1 的 G 列表 batch
​
  // 首先,从 P 本地队列中获取一部分 G 放入初始化的列表 batch
  n := t - h
  n = n / 2
  if n != uint32(len(_p_.runq)/2) {
    throw("runqputslow: queue is not full")
  }
  for i := uint32(0); i < n; i++ { // 将 P 本地环形队列的前一半 G 放入batch
    batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
  }
  if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
    return false
  }
  batch[n] = gp    // 将传入的 G 放入列表 batch 的尾部
​
  if randomizeScheduler {     // 打乱 batch 列表中G的顺序
    for i := uint32(1); i <= n; i++ {
      j := fastrandn(i + 1)
      batch[i], batch[j] = batch[j], batch[i]
    }
  }
​
  // 将 batch列表的 G 串成一个链表.
  for i := uint32(0); i < n; i++ {
    batch[i].schedlink.set(batch[i+1])
  }
  var q gQueue     // 将 batch 列表设置成 gQueue 队列
  q.head.set(batch[0])
  q.tail.set(batch[n])
​
  // 现在把 gQueue 队列放入全局队列
  lock(&sched.lock)
  globrunqputbatch(&q, int32(n+1))
  unlock(&sched.lock)
  return true
}

runtime.runqputslow() 函数会把 P 本地环形队列的前一半 G 获取出来,跟传入的 G 组成一个列表,打乱顺序,再放入全局队列。

综上所属,用下图表示调度器启动流程:

图4.7 调度器启动流程
图4.7 调度器启动流程

5. 调度循环

我们再回到5.1节的程序启动流程,runtime·rt0_go 函数在调用 runtime.schedinit() 初始化好了调度器、调用 runtime.newproc()创建了main函数的 G 后,会调用runtime.mstart() 函数启动 M 去执行G。

代码语言:javascript
复制
TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0
  CALL  runtime·mstart0(SB)
  RET // not reached

runtime.mstart() 是用汇编写的,会直接调用 runtime.mstart0() 函数:

代码语言:javascript
复制
// src/runtime/proc.go
func mstart0() {
  _g_ := getg()
  ......
        // 初始化 g0 的参数
  _g_.stackguard0 = _g_.stack.lo + _StackGuard
  _g_.stackguard1 = _g_.stackguard0
  mstart1()
​
  ......
  mexit(osStack)
}

runtime.mstart0() 函数主要调用 runtime.mstart1():

代码语言:javascript
复制
// src/runtime/proc.go
func mstart1() {
  _g_ := getg()
​
  if _g_ != _g_.m.g0 {
    throw("bad runtime·mstart")
  }
​
  // 记录当前栈帧,便于其他调用复用,当进入 schedule 之后,再也不会回到 mstart1
  _g_.sched.g = guintptr(unsafe.Pointer(_g_))
  _g_.sched.pc = getcallerpc()
  _g_.sched.sp = getcallersp()
​
  asminit()
  minit()
​
  // 设置信号 handler;在 minit 之后,因为 minit 可以准备处理信号的的线程
  if _g_.m == &m0 {
    mstartm0()
  }
        // 执行启动函数
  if fn := _g_.m.mstartfn; fn != nil {
    fn()
  }
        // 如果当前 m 并非 m0,则要求绑定 p
  if _g_.m != &m0 {
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
  }
        // 准备好后,开始调度循环,永不返回
  schedule()
}

runtime.mstart1() 保存调度信息后,会调用 runtime.schedule() 进入调度循环,寻找一个可执行的 G 并执行。

循环调度主逻辑如图5.1所示:

图5.1 循环调度主逻辑
图5.1 循环调度主逻辑

runtime.schedule() 函数的逻辑是:

代码语言:javascript
复制
// src/runtime/proc.go
func schedule() {
  _g_ := getg()
​
  if _g_.m.locks != 0 {
    throw("schedule: holding locks")
  }
  ......
top:
  pp := _g_.m.p.ptr()
  pp.preempt = false
​
  if sched.gcwaiting != 0 {    // 如果需要 GC,不再进行调度
    gcstopm()
    goto top
  }
  if pp.runSafePointFn != 0 {  // 不等于0,说明在安全点
    runSafePointFn()
  }
​
  // 如果 G 所在的 M 在自旋,说明其P运行队列为空,如果不为空,则应该甩出错误
  if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
    throw("schedule: spinning with local work")
  }
        // 运行 P 上准备就绪的 Timer
  checkTimers(pp, 0)
​
  var gp *g
  var inheritTime bool
        ......
  if gp == nil {    // 说明不在 GC
    // 每调度 61 次,就检查一次全局队列,保证公平性;否则两个 Goroutine 可以通过互换,一直占领本地的 runqueue
    if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
      lock(&sched.lock)
      gp = globrunqget(_g_.m.p.ptr(), 1)     // 从全局队列中偷 g
      unlock(&sched.lock)
    }
  }
  if gp == nil {
                // 从 P 的本地队列获取 G
    gp, inheritTime = runqget(_g_.m.p.ptr())
  }
  if gp == nil {
    gp, inheritTime = findrunnable() // 阻塞式查找可用 G
  }
​
  // M 这时候一定是获取到了G
  // 如果 M 是自旋状态,重置其状态到非自旋
  if _g_.m.spinning {
    resetspinning()
  }
        .......
  // 执行 G
  execute(gp, inheritTime)
}

runtime.schedule() 函数会从下面几个地方查找待执行的 Goroutine:

1)为了保证公平,当全局运行队列中有待执行的 G 时,通过 schedtick 对 61 取模,表示每 61 次会有一次从全局的运行队列中查找对应的 G,这样可以避免两个 G 在 P 本地队列互换一直占有本地队列; 2)调用 runtime.runqget() 函数从 P 本地的运行队列中获取待执行的 G; 3)如果前两种方法都没有找到 G,会通过 runtime.findrunnable() 进行阻塞地查找 G;

runtime.schedule 函数从全局队列获取 G 的函数是 runtime.globrunqget() 函数:

代码语言:javascript
复制
// 从全局队列获取 G
func globrunqget(_p_ *p, max int32) *g {
  assertLockHeld(&sched.lock)
        // 如果全局队列没有 G,则直接返回
  if sched.runqsize == 0 {
    return nil
  }
        // 计算n,表示从全局队列放入本地队列的 G 的个数
  n := sched.runqsize/gomaxprocs + 1
  if n > sched.runqsize {
    n = sched.runqsize
  }
        // n 不能超过取的要获取的max个数 
  if max > 0 && n > max {
    n = max
  }
        // 计算能不能用本地队列的一般放下 n 个 G,如果放不下,则 n 设为本地队列的一半
  if n > int32(len(_p_.runq))/2 {
    n = int32(len(_p_.runq)) / 2
  }
​
  sched.runqsize -= n
        // 拿到全局队列的队头作为返回的 G
  gp := sched.runq.pop()   
  n--   // n计数减 1
        // 继续取剩下的 n-1个全局队列 G 放入本地队列
  for ; n > 0; n-- {
    gp1 := sched.runq.pop()
    runqput(_p_, gp1, false)
  }
  return gp
}

runtime.globrunqget() 函数会从全局队列获取 n 个 G,第一个 G 返回给调度器去执行,剩下的 n-1 个 G 放入本地队列,其中,n一般为全局队列长度 / P处理器个数 + 1,含义是平均每个 P 应该从全局队列中承担的 G 数量,且不能超过 P 本地长度的一半。

runtime.schedule() 函数调用 runtime.runqget() 函数从 P 本地的运行队列中获取待执行的 G:

代码语言:javascript
复制
// 从 P 本地队列中获取 G
func runqget(_p_ *p) (gp *g, inheritTime bool) {
  // 如果 P 有一个 runnext,则它就是下一个要执行的 G.
  next := _p_.runnext
  // 如果 runnext 不为空,而 CAS 失败, 则它又可能被其他 P 偷取了,
  // 因为其他 P 可以竞争机会到设置 runnext 为 0, 当前 P 只能设置该字段为非0
  if next != 0 && _p_.runnext.cas(next, 0) {
    return next.ptr(), true
  }
​
  for {
    h := atomic.LoadAcq(&_p_.runqhead) //从本地环形队列头遍历
    t := _p_.runqtail
    if t == h {    // 头尾指针相等,表示本地队列为空
      return nil, false
    }
                // 获取头部指针指向的 G
    gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
    if atomic.CasRel(&_p_.runqhead, h, h+1) { 
      return gp, false
    }
  }
}

本地队列的获取会先从 P 的 runnext 字段中获取,如果不为空则直接返回。如果 runnext 为空,那么从本地环形队列头指针遍历本地队列,取到了则返回。

阻塞式获取 G 的 runtime.findrunnable() 函数的整个逻辑看起来比较繁琐,其实无非是按这个顺序获取 G: local -> global -> netpoll -> steal -> local -> global -> netpoll:

代码语言:javascript
复制
// 找到一个可运行的 G 去执行
// 会从其他 P 的运行队列偷取,从本地会全局队列获取,或从网络轮询器获取
func findrunnable() (gp *g, inheritTime bool) {
  _g_ := getg()
​
top:
  _p_ := _g_.m.p.ptr()
  if sched.gcwaiting != 0 {     // 如果在 gc,则休眠当前 m,直到复始后回到 top
    gcstopm()
    goto top
  }
  if _p_.runSafePointFn != 0 {  // 不等于0,说明在安全点
    runSafePointFn()
  }
​
  now, pollUntil, _ := checkTimers(_p_, 0)
  
​
  // 取本地队列 local runq,如果已经拿到,立刻返回
  if gp, inheritTime := runqget(_p_); gp != nil {
    return gp, inheritTime
  }
​
  // 全局队列 global runq,如果已经拿到,立刻返回
  if sched.runqsize != 0 {
    lock(&sched.lock)
    gp := globrunqget(_p_, 0)
    unlock(&sched.lock)
    if gp != nil {
      return gp, false
    }
  }
​
  // 从 netpoll 网络轮询器中尝试获取 G,优先级比从其他 P 偷取 G 要高
  if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
    if list := netpoll(0); !list.empty() { // non-blocking
      gp := list.pop()
      injectglist(&list)
      casgstatus(gp, _Gwaiting, _Grunnable)
      if trace.enabled {
        traceGoUnpark(gp, 0)
      }
      return gp, false
    }
  }
​
  // 自旋 M: 从其他 P 中窃取任务 G
  
  // 限制自旋 M 数量到忙碌P数量的一半. 避免一半P数量、并行机制很慢时的CPU消耗
  procs := uint32(gomaxprocs)
  if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
    if !_g_.m.spinning {
      _g_.m.spinning = true
      atomic.Xadd(&sched.nmspinning, 1)
    }
                // 从其他 P 或 timer 中偷取G
    gp, inheritTime, tnow, w, newWork := stealWork(now)
    now = tnow
    if gp != nil {
      // Successfully stole.
      return gp, inheritTime
    }
    if newWork {
      // 可能有新的 timer 或 GC,重新开始
      goto top
    }
    if w != 0 && (pollUntil == 0 || w < pollUntil) {
      // Earlier timer to wait for.
      pollUntil = w
    }
  }
​
  // 没有任何 work 可做。        // 如果我们在 GC mark 阶段,则可以安全的扫描并标记对象为黑色        // 然后便有 work 可做,运行 idle-time 标记而非直接放弃当前的 P。
  if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
    node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
    if node != nil {
      _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
      gp := node.gp.ptr()
      casgstatus(gp, _Gwaiting, _Grunnable)
      if trace.enabled {
        traceGoUnpark(gp, 0)
      }
      return gp, false
    }
  }
        .....
  // 放弃当前的 P 之前,对 allp 做一个快照
  allpSnapshot := allp
  idlepMaskSnapshot := idlepMask
  timerpMaskSnapshot := timerpMask
​
  // 准备归还 p,对调度器加锁
  lock(&sched.lock)        // 进入了 gc,回到顶部并停止 m
  if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
    unlock(&sched.lock)
    goto top
  }        // 全局队列中又发现了任务
  if sched.runqsize != 0 {
    gp := globrunqget(_p_, 0)     // 赶紧偷掉返回
    unlock(&sched.lock)
    return gp, false
  }
  if releasep() != _p_ {         // 归还当前的 p
    throw("findrunnable: wrong p")
  }
  pidleput(_p_)       // 将 p 放入 idle 链表
  unlock(&sched.lock)      // 完成归还,解锁
​
  // 这里要非常小心: 线程从自旋到非自旋状态的转换,可能与新 Goroutine 的提交同时发生
  wasSpinning := _g_.m.spinning
  if _g_.m.spinning {
    _g_.m.spinning = false    // M 即将睡眠,状态不再是 spinning
    if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
      throw("findrunnable: negative nmspinning")
    }
​
    // 再次检查所有的 runqueue
    _p_ = checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
    if _p_ != nil {
      acquirep(_p_)
      _g_.m.spinning = true
      atomic.Xadd(&sched.nmspinning, 1)
      goto top
    }
​
    // 再次检查 idle-priority GC work,和上面重新找 runqueue 的逻辑类似
    _p_, gp = checkIdleGCNoP()
    if _p_ != nil {
      acquirep(_p_)
      _g_.m.spinning = true
      atomic.Xadd(&sched.nmspinning, 1)
​
      // Run the idle worker.
      _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
      casgstatus(gp, _Gwaiting, _Grunnable)
      if trace.enabled {
        traceGoUnpark(gp, 0)
      }
      return gp, false
    }
​
    // 最后, 检查 timer creation
    pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)
  }
​
  // 再次检查 netpoll 网络轮询器,和上面重新找 runqueue 的逻辑类似
  if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
    ......
    lock(&sched.lock)
    _p_ = pidleget()
    unlock(&sched.lock)
    if _p_ == nil {
      injectglist(&list)
    } else {
      acquirep(_p_)
      if !list.empty() {
        gp := list.pop()
        injectglist(&list)
        casgstatus(gp, _Gwaiting, _Grunnable)
        if trace.enabled {
          traceGoUnpark(gp, 0)
        }
        return gp, false
      }
      if wasSpinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
      }
      goto top
    }
  } else if pollUntil != 0 && netpollinited() {
    pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
    if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
      netpollBreak()
    }
  }
  stopm()     // 真的什么都没找到,暂止当前的 m
  goto top
}

runtime.findrunnable 函数的主要工作是:

1)首先检查是是否正在进行 GC,如果是则休眠当前的 M ; 2)尝试从本地队列中取 G,如果取到,则直接返回,否则继续从全局队列中找 G,如果找到则直接返回; 3)检查 netpoll 网络轮询器中是否有 G,如果有,则直接返回; 4)如果此时仍然无法找到 G,则从其他 P 的本地队列中偷取;从其他 P 本地队列偷取的工作会执行四轮,如果找到 G,则直接返回; 5)所有的可能性都尝试过了,在准备休眠 M 之前,还要进行额外的检查; 6)首先检查此时是否是 GC mark 阶段,如果是,则直接返回 mark 阶段的 G; 7)如果仍然没有,则对当前的 P 进行快照,准备对调度器进行加锁; 8)当调度器被锁住后,仍然还需再次检查这段时间里是否有进入 GC,如果已经进入了 GC,则回到第一步,阻塞 M 并休眠; 9)如果又在全局队列中发现了 G,则直接返回; 10)当调度器被锁住后,我们彻底找不到任务了,则归还释放当前的 P,将其放入 idle 链表中,并解锁调度器; 11)当 M、P 已经解绑后,我们需要将 M 的状态切换出自旋状态,并减少 nmspinning; 12)此时仍然需要重新检查所有的队列,如果我们又在全局队列中发现了 g,则直接返回; 13)还需要再检查是否存在 poll 网络的 G,如果有,则直接返回; 14)什么也没找到,休眠当前的 M。

runtime.findrunnable 函数的逻辑如图 5.2 所示:

图5.2 runtime.findrunnable()函数逻辑
图5.2 runtime.findrunnable()函数逻辑

如果调度循环函数 runtime.schedule() 从通过 runtime.globrunqget() 从全局队列,通过 runtime.runqget() 从 P 本地队列,以及 runtime.findrunnable 从各个地方,获取到了一个可执行的 G, 则会调用 runtime.execute() 函数去执行它,它会通过 runtime.gogo() 将 G 调度到当前线程上开始真正执行,之后 runtime.gogo() 会调用 runtime.goexit(),并依次进入runtime.goexit1(),和 runtime.goexit0(),最后在 runtime.goexit0() 函数的结尾会再次调用 runtime.schedule() ,进入下一次调度循环。

6. 总结

总结的内容已经放在了开头的结论中了。

最近听到一句话:任何领域的顶尖高手,都是在花费大量时间和身心投入后,达到了用灵魂触碰到更高维度的真实存在的境界,而不是用头脑在思考和工作,因此作出来的产品都极具美感、实用性和创造性,就好像偷取了上帝的创意一样。

在 Go 调度器的底层原理的学习中,不仅需要亲自花时间去分析源码的细节,更加要大量阅读 Go 开发者的文章,需要用心体会机制设计背后的原因。

Reference

Go语言设计与实现6.5节调度器 https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-goroutine/#g

Go语言原本6.3节MPG模型与并发调度单元 https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/mpg/

Go调度器系列(3)图解调度原理 https://lessisbetter.site/2019/04/04/golang-scheduler-3-principle-with-graph/

Golang的协程调度器原理及GMP设计思想 https://www.yuque.com/aceld/golang/srxd6d

详解Go语言调度循环源码实现 https://www.luozhiyun.com/archives/448

golang源码分析之协程调度器底层实现( G、M、P) https://blog.csdn.net/qq_25870633/article/details/83445946

「译」Scheduling In Go Part I - OS Scheduler https://blog.lever.wang/golang/os_schedule/

「译」Scheduling In Go Part II - Go Scheduler https://blog.lever.wang/golang/go_schedule/

深入 golang -- GMP调度 https://zhuanlan.zhihu.com/p/502740833

深度解密 Go 语言之 scheduler https://qcrao.com/post/dive-into-go-scheduler/

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0. 结论
  • 1. GMP调度模型的设计思想
    • 1.1 传统多线程的问题
      • 1.2 Go语言早期引入的 GM 模型
        • 1.3 当前高效的 GMP 模型
        • 2. 多图详解几种常见的调度场景
        • 3. GMP的数据结构和各种状态
          • 3.1 G 的数据结构和状态
            • 3.2 M 的数据结构
              • 3.3 P 的数据结构
                • 3.4 schedt 的数据结构
                • 4. 调度器的启动
                  • 4.1 程序启动流程
                    • 4.2 调度器的启动
                      • 4.3 怎样创建 G ?
                      • 5. 调度循环
                      • 6. 总结
                      • Reference
                      相关产品与服务
                      负载均衡
                      负载均衡(Cloud Load Balancer,CLB)提供安全快捷的四七层流量分发服务,访问流量经由 CLB 可以自动分配到多台后端服务器上,扩展系统的服务能力并消除单点故障。轻松应对大流量访问场景。 网关负载均衡(Gateway Load Balancer,GWLB)是运行在网络层的负载均衡。通过 GWLB 可以帮助客户部署、扩展和管理第三方虚拟设备,操作简单,安全性强。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档