首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >【初识Go】| Day13 并发编程

【初识Go】| Day13 并发编程

作者头像
yussuy
修改于 2020-12-28 03:46:19
修改于 2020-12-28 03:46:19
4400
举报

Erlang 之父 Joe Armstrong曾经以下图解释并发与并行。

cor.jpg
cor.jpg

并发在图中的解释是两队人排队接咖啡,两队切换。

并行是两个咖啡机,两队人同时接咖啡。

“Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.” — Rob Pike

并发使并行变得容易,并发提供了一种构造解决方案的方法,并行一般伴随这多核。并发一般伴随这CPU切换轮训。

为什么需要并发?

原因有很多,其中比较重要的原因如下:

  1. 不阻塞等待其他任务的执行,从而浪费时间,影响系统性能。
  2. 并行可以使系统变得简单些,将复杂的大任务切换成许多小任务执行,单独测试。

在开发中,经常会遇到为什么某些进程通常会相互等待呢?为什么有些运行慢,有些快呢?

通常受限来源于进程I/OCPU

  • 进程I/O限制

如:等待网络或磁盘访问

  • CPU限制

如:大量计算

Go语言中的并发程序可以用两种手段来实现:goroutine和channel。

协程Goroutines

每个go程序至少都有一个Goroutine:主Goroutine(在运行进程时自动创建)。以及程序中其他Goroutine 例如:下面程序创建了main的Goroutine及匿名的Goroutine。

代码语言:txt
AI代码解释
复制
func main() {

    go func() {

        fmt.Println("you forgot me !")

    }()

}

在go中有个package是sync,里面包含了:

WaitGroup、Mutex、Cond、Once、Pool,下面依次介绍。

WaitGroup

假设主线程要等待其余的goroutine都运行完毕,不得不在末尾添加time.Sleep(),但是这样会引发两个问题:

  • 等待多长时间?
  • 时间太长,影响性能?

在go的sync库中的WaitGroup可以帮助我们完成此项工作,Add(n)把计数器设置为n,Done()会将计数器每次减1,Wait()函数会阻塞代码运行,直到计数器减0。

等待多个goroutine完成,可以使用一个等待组。 例如:

代码语言:txt
AI代码解释
复制
// 这是我们将在每个goroutine中运行的函数。

// 注意,等待组必须通过指针传递给函数。

func worker(id int, wg *sync.WaitGroup) {



    defer wg.Done()



    fmt.Printf("Worker %d starting\n", id)



    time.Sleep(time.Second)

    fmt.Printf("Worker %d done\n", id)

}



func main() {



    var wg sync.WaitGroup



    for i := 1; i <= 5; i++ {

        wg.Add(1)

        go worker(i, &wg)

    }



    wg.Wait()

}

这里首先把wg 计数设置为1, 每个for循环运行完毕都把计数器减一,主函数中使用Wait() 一直阻塞,直到wg为1——也就是所有的5个for循环都运行完毕。

使用注意点:

  • 计数器不能为负值
  • WaitGroup对象不是引用类型

Once

sync.Once可以控制函数只能被调用一次,不能多次重复调用。

例如:

代码语言:txt
AI代码解释
复制
var doOnce sync.Once



func main() {

    DoSomething()

    DoSomething()

}



func DoSomething() {

    doOnce.Do(func() {

        fmt.Println("Run once - first time, loading...")

    })go

    fmt.Println("Run this every time")

}

输出:

代码语言:txt
AI代码解释
复制
Run once - first time, loading...

Run this every time

Run this every tim

互斥锁Mutex

互斥锁是并发程序对共享资源进行访问控制的主要手段,在go中的sync中提供了Mutex的支持。

例如:使用互斥锁解决多个Goroutine访问同一变量。

代码语言:txt
AI代码解释
复制
// SafeCounter 的并发使用是安全的。

type SafeCounter struct {

    v   map[string]int

    mux sync.Mutex

}



// Inc 增加给定 key 的计数器的值。

func (c *SafeCounter) Inc(key string) {

  c.mux.Lock()

  defer c.mux.Unlock()

    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v

  c.v[key]++

}



// Value 返回给定 key 的计数器的当前值。

func (c *SafeCounter) Value(key string) int {

    c.mux.Lock()

    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v

    defer c.mux.Unlock()

    return c.v[key]

}



func main() {

    c := SafeCounter{v: make(map[string]int)}

    for i := 0; i < 1000; i++ {

        go c.Inc("somekey")

    }



    time.Sleep(time.Second)

    fmt.Println(c.Value("somekey"))

}

在这个例子中,我们使用了sync.Mutex的Lock与Unlock方法。

在前面例子中我们使用了sync.Mutex,读操作与写操作都会被阻塞。其实读操作的时候我们是不需要进行阻塞的,因此sync中还有另一个锁:读写锁RWMutex,这是一个单写多读模型。

sync.RWMutex分为:读、写锁。在读锁占用下,会阻止写,但不会阻止读,多个goroutine可以同时获取读锁,调用RLock()函数即可,RUnlock()函数释放。写锁会阻止任何goroutine进来,整个锁被当前goroutine,此时等价于Mutex,写锁调用Lock启用,通过UnLock()释放。

例如: 我们对上述例子进行改写,读的时候用读锁,写的时候用写锁。

代码语言:txt
AI代码解释
复制
// SafeCounter 的并发使用是安全的。

type SafeCounter struct {

    v     map[string]int

    rwmux sync.RWMutex

}



// Inc 增加给定 key 的计数器的值。

func (c *SafeCounter) Inc(key string) {

    // 写操作使用写锁

    c.rwmux.Lock()

    defer c.rwmux.Unlock()

    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v

    c.v[key]++

}



// Value 返回给定 key 的计数器的当前值。

func (c *SafeCounter) Value(key string) int {

  // 读的时候加读锁

    c.rwmux.RLock()

    // Lock 之后同一时刻只有一个 goroutine 能访问 c.v

    defer c.rwmux.RUnlock()

    return c.v[key]

}



func main() {

    c := SafeCounter{v: make(map[string]int)}

    for i := 0; i < 1000; i++ {

        go c.Inc("somekey")

    }



    time.Sleep(time.Second)



    for i := 0; i < 10; i++ {

        fmt.Println(c.Value("somekey"))

    }

}

条件变量Cond

sync.Cond是条件变量,它可以让一系列的 Goroutine 都在满足特定条件时被唤醒。

条件变量通常与互斥锁一起使用,条件变量可以在共享资源的状态变化时通知相关协程。 经常使用的函数如下:

  • NewCond

创建一个Cond的条件变量。

代码语言:txt
AI代码解释
复制
func NewCond(l Locker) *Cond
  • Broadcast

广播通知,调用时可以加锁,也可以不加。

代码语言:txt
AI代码解释
复制
func (c *Cond) Broadcast()
  • Signal

单播通知,只唤醒一个等待c的goroutine。

代码语言:txt
AI代码解释
复制
func (c *Cond) Signal()
  • Wait 等待通知, Wait()会自动释放c.L,并挂起调用者的goroutine。之后恢复执行,Wait()会在返回时对c.L加锁。

除非被Signal或者Broadcast唤醒,否则Wait()不会返回。

代码语言:txt
AI代码解释
复制
func (c *Cond) Wait()

例如:使用WaitGroup等待两个Goroutine完成, Goroutine1与Goroutine2进入Wait状态,main函数在2s后改变共享数据状态,调用Broadcast函数,此时c.Wait从中恢复并判断条件变量是否已经满足,满足后消费条件,解锁,wg.Done()。

原子操作

原子操作即是进行过程中不能被中断的操作。针对某个值的原子操作在被进行的过程中,CPU绝不会再去进行其他的针对该值的操作。 为了实现这样的严谨性,原子操作仅会由一个独立的CPU指令代表和完成。

在sync/atomic 中,提供了一些原子操作,包括加法(Add)、比较并交换(Compare And Swap,简称 CAS)、加载(Load)、存储(Store)和交换(Swap)。

1.加法操作 提供了32/64位有符号与无符号加减操作

代码语言:txt
AI代码解释
复制
var i int64

atomic.AddInt64(&i, 1)

fmt.Println("i = i + 1 =", i)

atomic.AddInt64(&i, -1)

fmt.Println("i = i - 1 =", i

2.比较并交换

CAS: Compare And Swap

如果addr和old相同,就用new代替addr。

代码语言:txt
AI代码解释
复制
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

例如:

代码语言:txt
AI代码解释
复制
var a int32 = 1

var b int32 = 2

var c int32 = 3

ok := atomic.CompareAndSwapInt32(&a, a, b)

fmt.Printf("ok = %v, a = %v, b = %v\n", ok, a, b)

ok = atomic.CompareAndSwapInt32(&a, c, b)

fmt.Printf("ok = %v, a = %v, b = %v, c=%v\n", ok, a, b, c)

输出:

代码语言:txt
AI代码解释
复制
ok = true, a = 2, b = 2

ok = false, a = 2, b = 2, c = 3

3.交换

不管旧值与新值是否相等,都会通过新值替换旧值,返回的值是旧值。

代码语言:txt
AI代码解释
复制
func SwapInt32(addr *int32, new int32) (old int32)

例如:

代码语言:txt
AI代码解释
复制
var x int32 = 1

var y int32 = 2

old := atomic.SwapInt32(&x, y)

fmt.Println(x, old)

输出:2 1

3.加载

当读取该指针指向的值时,CPU 不会执行任何其它针对此值的读写操作

代码语言:txt
AI代码解释
复制
func LoadInt32(addr *int32) (val int32)

例如:

代码语言:txt
AI代码解释
复制
var x1 int32 = 1

y1 := atomic.LoadInt32(&x)

fmt.Println("x1, y1:", x1, y1)

4.存储

加载逆向操作。

例如:

代码语言:txt
AI代码解释
复制
var xx int32 = 1

var yy int32 = 2

atomic.StoreInt32(&yy, atomic.LoadInt32(&xx))

fmt.Println(xx, yy)

5.原子类型

sync/atomic中添加了一个新的类型Value。 例如:

代码语言:txt
AI代码解释
复制
v := atomic.Value{}

v.Store(1)

fmt.Println(v.Load())

临时对象池Pool

ync.Pool 可以作为临时对象的保存和复用的集合

P是Goroutine中的重要组成之一,例如:P实际上在操作时会为它的每一个goroutine相关的P生成一个本地P。 本地池没有,则会从其它的 P 本地池获取,或者全局P取。

sync.Pool对于需要重复分配、回收内存的地方,sync.Pool 是一个很好的选择。减少GC负担,如果Pool中有对象,下次直接取,不断服用对象内存,减轻 GC 的压力,提升系统的性能。

例如:

代码语言:txt
AI代码解释
复制
var pool *sync.Pool



type Foo struct {

    Name string

}



func Init() {

    pool = &sync.Pool{

        New: func() interface{} {

            return new(Foo)

        },

    }

}



func main() {

    fmt.Println("Init p")

    Init()



    p := pool.Get().(*Foo)

    fmt.Println("第一次取:", p)

    p.Name = "bob"

    pool.Put(p)



    fmt.Println("池子有对象了,调用获取", pool.Get().(*Foo))

    fmt.Println("池子空了", pool.Get().(*Foo))

}

输出:

代码语言:txt
AI代码解释
复制
Init p

第一次取: &{}

池子有对象了,调用获取 &{bob}

池子空了 &{}

通道Channel

Channel

1) 使用

Channel的使用需要通过make创建,例如:

代码语言:txt
AI代码解释
复制
unBufferChan := make(chan int) 

bufferChan := make(chan int, x) 

上述创建了无缓冲的Channel与有缓冲的Channel,创建完成之后,需要进行读写操作,如下:

代码语言:txt
AI代码解释
复制
ch := make(chan int, 1)



// 读操作

x <- ch



// 写操作

ch <- x

最终要正确关闭,只需要调用close即可。

代码语言:txt
AI代码解释
复制
// 关闭

close(ch)

当channel关闭后会引发下面相关问题:

  • 重复关闭Channel 会 panic
  • 向关闭的Channel发数据 会 Panic,读关闭的Channel不会Panic,但读取的是默认值

对于最后一点读操作默认值怎么区分呢?例如:Channel本身的值是默认值又或者是读到的是关闭后的默认值,可以通过下面进行区分:

代码语言:txt
AI代码解释
复制
val, ok := <-ch

if ok == false {

    // channel closed

}

2) Channel分类

  • 无缓冲的Channel

发送与接受同时进行。如果没有Goroutine读取Channel(<-Channel),发送者(Channel<-x)会一直阻塞。

unbufferedchannel.png
unbufferedchannel.png
  • 有缓冲的Channel

发送与接受并非同时进行。当队列为空,接受者阻塞;队列满,发送者阻塞。

bufferedchannel.png
bufferedchannel.png

Select

  • 每个case 都必须是一个通信
  • 所有channel表达式都会被求值
  • 如果没有default语句,select将阻塞,直到某个通信可以运行
  • 如果多个case都可以运行,select会随机选择一个执行

1) 随机选择

select特性之一:随机选择,下面会随机打印不同的case结果。 例如:

代码语言:txt
AI代码解释
复制
ch := make(chan int, 1)

ch <- 1

select {

case <-ch:

    fmt.Println("ch 1")

case <-ch:

    fmt.Println("ch 2")

default:

    fmt.Println("ch default")

}

假设chan中没有值,有可能引发死锁。

例如: 下面执行后会引发死锁。

代码语言:txt
AI代码解释
复制
ch := make(chan int, 1)

select {

case <-ch:

    fmt.Println("ch 1")

case <-ch:

    fmt.Println("ch 2")

}

此时可以加上default即可解决。

代码语言:txt
AI代码解释
复制
default:

    fmt.Println("ch default")

另外,还可以添加超时。

代码语言:txt
AI代码解释
复制
timeout := make(chan bool, 1)

go func() {

    time.Sleep(2 * time.Second)

    timeout <- true

}()

ch := make(chan int, 1)



select {

case <-ch:

    fmt.Println("ch 1")

case <-timeout:

    fmt.Println("timeout 1")

case <-time.After(time.Second * 1):

    fmt.Println("timeout 2")

}

2) 检查chan

select+defaul方式来确保channel是否满

代码语言:txt
AI代码解释
复制
ch := make(chan int, 1)

ch <- 1

select {

case ch <- 1:

    fmt.Println("channel value is ", <-ch)

    fmt.Println("channel value is ", <-ch)

default:

    fmt.Println("channel blocking")

}

如果要调整channel大小,可以在make的时候改变size,这样就可以在case中往channel继续写数据。

3) 选择循环

当多个channel需要读取数据的时候,就必须使用 for+select

例如:下面例子需要从两个channel中读取数据,当从channel1中数据读取完毕后,会像signal channel中输入stop,此时终止for+select。

代码语言:txt
AI代码解释
复制
func f1(c chan int, s chan string) {

    for i := 0; i < 10; i++ {

        time.Sleep(time.Second)

        c <- i

    }

    s <- "stop"

}



func f2(c chan int, s chan string) {

    for i := 20; i >= 0; i-- {

        time.Sleep(time.Second)

        c <- i

    }

    s <- "stop"

}



func main() {

    c1 := make(chan int)

    c2 := make(chan int)

    signal := make(chan string, 10)



    go f1(c1, signal)

    go f2(c2, signal)

LOOP:

    for {

        select {

        case data := <-c1:

            fmt.Println("c1 data is ", data)

        case data := <-c2:

            fmt.Println("c2 data is ", data)

        case data := <-signal:

            fmt.Println("signal is ", data)

            break LOOP

        }

    }

}

参考资料

https://github.com/datawhalechina/go-talent/blob/master/12.%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B.md

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
​Golang 并发编程指南
作者:dcguo,腾讯 CSIG 电子签开放平台中心 分享 Golang 并发基础库,扩展以及三方库的一些常见问题、使用介绍和技巧,以及对一些并发库的选择和优化探讨。 go 原生/扩展库 提倡的原则 不要通过共享内存进行通信;相反,通过通信来共享内存。 Goroutine goroutine 并发模型 调度器主要结构 主要调度器结构是 M,P,G M,内核级别线程,goroutine 基于 M 之上,代表执行者,底层线程,物理线程 P,处理器,用来执行 goroutine,因此维护了一个 gorout
腾讯技术工程官方号
2021/12/20
1.5K0
Golang 基础:原生并发 goroutine channel 和 select 常见使用场景
goroutine 是由 Go 运行时(runtime)负责调度的、轻量的用户级线程。
张拭心 shixinzhang
2022/05/10
1.2K1
Golang 基础:原生并发 goroutine channel 和 select 常见使用场景
Go 专栏|并发编程:goroutine,channel 和 sync
原文链接: Go 专栏|并发编程:goroutine,channel 和 sync
AlwaysBeta
2021/09/16
6840
Go 专栏|并发编程:goroutine,channel 和 sync
Golang 基础:底层并发原语 Mutex RWMutex Cond WaitGroup Once等使用和基本实现
上一篇 《原生并发 goroutine channel 和 select 常见使用场景》 介绍了基于 CSP 模型的并发方式。
张拭心 shixinzhang
2022/05/10
4360
Golang 基础:底层并发原语 Mutex RWMutex Cond WaitGroup Once等使用和基本实现
【Golang】并发
go 程(goroutine)是 go 并发的核心,它比线程要更小, 由 go Runtime 管理,运行 goroutine 只需要很少的栈空间,因此可以实现很大的并发量,在 go 中,开启一个 goroutine 只需要使用 go 关键字即可:
JuneBao
2022/10/26
4710
[警惕] 请勿滥用goroutine
在Go语言中,goroutine的创建成本很低,调度效率高,Go语言在设计时就是按以数万个goroutine为规范进行设计的,数十万个并不意外,但是goroutine在内存占用方面确实具有有限的成本,你不能创造无限数量的它们,比如这个例子:
Golang梦工厂
2022/07/11
5230
[警惕] 请勿滥用goroutine
避坑:Go并发编程时,如何避免发生竞态条件和数据竞争
现在,我们已经知道了。在编写并发程序时,如果不谨慎,没有考虑清楚共享资源的访问方式和同步机制,那么就会发生竞态条件和数据竞争这些问题,那么如何避免踩坑?避免发生竞态条件和数据竞争的办法有哪些?请看下面:
不背锅运维
2023/04/25
1.1K0
避坑:Go并发编程时,如何避免发生竞态条件和数据竞争
你应该知道的 Go WaitGroup 剖析
本篇主要介绍 WaitGroup 的一些特性,让我们从本质上去了解 WaitGroup。关于 WaitGroup 的基本用法这里就不做过多介绍了。相对于《这可能是最容易理解的 Go Mutex 源码剖析》来说,WaitGroup 就简单的太多了。
haohongfan
2021/04/26
5460
你应该知道的 Go WaitGroup 剖析
Go 并发编程面试题
在 Go 语言的同步库中,sync.Mutex是用来提供互斥锁的基本同步原语。Mutex用于保护共享资源,在多个 goroutine 尝试同时访问相同资源时确保只有一个 goroutine 能够访问该资源,从而避免竞态条件。
Lemon黄
2023/12/13
8420
Go 并发编程面试题
Go语言核心36讲(Go语言实战与应用九)--学习笔记
我们在前几次讲的互斥锁、条件变量和原子操作都是最基本重要的同步工具。在 Go 语言中,除了通道之外,它们也算是最为常用的并发安全工具了。
郑子铭
2021/11/21
2370
Go语言核心36讲(Go语言实战与应用九)--学习笔记
Go并发编程
百度Go语言优势,肯定有一条是说Go天生就有支持并发的优势,其他语言支持多线程并发,需要一定的门槛,基础的积累,学习多线程、进程语法。在Go中,就不需要考虑这些,原生提供goroutine(协程),自动帮你处理任务,
用户9022575
2021/10/01
5890
盘点Golang并发那些事儿之二
上一节提到,golang中直接使用关键字go创建goroutine,无法满足我们的需求。主要问题如下
PayneWu
2021/06/10
5310
盘点Golang并发那些事儿之二
Golang并发编程控制
重学编程之Golang的plan中的上一篇文章我向大家介绍了,并发编程基础,goroutine的创建,channel,正由于go语言的简洁性,我们可以简易快速的创建任意个协程。同时也留下了许多隐患,如果没有更加深入的学习,其实很难直接将其运用到实际项目中,实际生活中。为什么呢?并发的场景许许多多,但一味的只知道其创建,是很难有效的解决问题。例如以下场景-资源竞争
PayneWu
2020/12/18
5990
Golang并发编程控制
Go并发编程基础(译)
原文:Fundamentals of concurrent programming 译者:youngsterxyf 本文是一篇并发编程方面的入门文章,以Go语言编写示例代码,内容涵盖: 运行期并发线程(goroutines) 基本的同步技术(管道和锁) Go语言中基本的并发模式 死锁和数据竞争 并行计算 在开始阅读本文之前,你应该知道如何编写简单的Go程序。如果你熟悉的是C/C++、Java或Python之类的语言,那么 Go语言之旅 能提供所有必要的背景知识。也许你还有兴趣读一读 为C++程序员准备的Go
李海彬
2018/03/26
1.5K0
Go并发编程基础(译)
Golang并发控制方式有几种?
Go语言中的goroutine是一种轻量级的线程,其优点在于占用资源少、切换成本低,能够高效地实现并发操作。但如何对这些并发的goroutine进行控制呢?
孟斯特
2024/01/24
7310
Golang并发控制方式有几种?
深入理解Go语言的并发模型
并发(Concurrency)与并行(Parallelism)是两个常常混淆的概念。并发指的是在同一时间段内处理多个任务,而并行则是指在同一时刻同时执行多个任务。Go语言的并发模型更侧重于并发,通过goroutines和channels来管理任务之间的交互和通信。
二一年冬末
2024/06/28
1930
go 并发编程
Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的 sync.Mutex、sync.RWMutex、sync.WaitGroup、sync.Once。
haifeiWu
2020/07/03
7830
go 并发编程
17.Go语言-线程同步
在 Go 语言中,经常会遇到并发的问题,当然我们会优先考虑使用通道,同时 Go 语言也给出了传统的解决方式 Mutex(互斥锁) 和 RWMutex(读写锁) 来处理竞争条件。
面向加薪学习
2022/09/04
2990
channel练习题
编写一个程序,其中两个 goroutine 来回传递一个整数十次。当每个 goroutine 接收到整数时打印。每次通过整数都增加。一旦整数等于 10,立刻终止程序。
Michel_Rolle
2024/09/17
3K0
Go通关10:并发控制,同步原语 sync 包
您诸位好啊,我是无尘。又到了愉快的周末,肝了一上午,给大家介绍下 sync 包。除了上一节我们介绍的 channel 通道,还有 sync.Mutex、sync.WaitGroup 这些原始的同步机制,来更加灵活的实现数据同步和控制并发。
微客鸟窝
2021/08/18
6050
相关推荐
​Golang 并发编程指南
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档