在 Go 语言中,协程(goroutine)是轻量级的执行单元,虽然开销小,但无限制地创建协程仍然会消耗大量系统资源,甚至导致程序崩溃。因此,合理控制协程数量是编写高效 Go 程序的关键。本文将介绍几种常用的协程数量控制方法,并结合具体案例说明其用法。
带缓冲的通道可以作为一个简易的信号量(Semaphore),通过控制通道的容量来限制同时运行的协程数量。
基本原理:
案例代码:
package main
import (
"fmt"
"time"
)
func worker(id int, sem chan struct{}) {
defer func() { <-sem }() // 释放令牌
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second) // 模拟工作
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
const maxGoroutines = 3 // 最大协程数量
sem := make(chan struct{}, maxGoroutines)
totalTasks := 10 // 总任务数
for i := 0; i < totalTasks; i++ {
sem <- struct{}{} // 获取令牌,若满则等待
go worker(i, sem)
}
// 等待所有令牌被释放(所有协程完成)
for i := 0; i < cap(sem); i++ {
sem <- struct{}{}
}
fmt.Println("所有任务完成")
}
sync.WaitGroup
用于等待一组协程完成,结合通道可以更灵活地控制协程数量。
案例代码:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
}
func main() {
const maxGoroutines = 3
sem := make(chan struct{}, maxGoroutines)
var wg sync.WaitGroup
totalTasks := 10
for i := 0; i < totalTasks; i++ {
sem <- struct{}{}
wg.Add(1)
go func(id int) {
defer func() { <-sem }()
worker(id, &wg)
}(i)
}
wg.Wait() // 等待所有任务完成
fmt.Println("所有任务完成")
}
工作池模式创建固定数量的工作协程,从任务队列中获取任务执行,适用于任务数量多且可批量处理的场景。
案例代码:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, job)
time.Sleep(time.Second) // 模拟处理时间
results <- job * 2 // 模拟处理结果
}
}
func main() {
const (
numWorkers = 3 // 工作协程数量
numJobs = 10 // 任务数量
)
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// 启动工作协程
wg.Add(numWorkers)
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results, &wg)
}
// 发送任务
go func() {
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // 所有任务发送完毕,关闭通道
}()
// 等待所有工作协程完成
go func() {
wg.Wait()
close(results) // 所有结果处理完毕,关闭通道
}()
// 收集结果
for result := range results {
fmt.Printf("收到结果: %d\n", result)
}
fmt.Println("所有任务完成")
}
对于复杂场景,可以使用成熟的第三方库,如 golang.org/x/sync/errgroup
或 github.com/panjf2000/ants
(高性能协程池)。
使用 errgroup 的案例:
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
func worker(id int) error {
fmt.Printf("Worker %d 开始工作\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d 完成工作\n", id)
return nil
}
func main() {
const maxGoroutines = 3
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(maxGoroutines) // 设置最大并发数
totalTasks := 10
for i := 0; i < totalTasks; i++ {
id := i
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return worker(id)
}
})
}
if err := g.Wait(); err != nil {
fmt.Printf("发生错误: %v\n", err)
} else {
fmt.Println("所有任务完成")
}
}