通道(channel) ,就是一个管道,可以想像成 Go 协程之间通信的管道。它是一种队列式的数据结构,遵循先入先出的规则。
每个通道都只能传递一种数据类型的数据,在你声明的时候,我们要指定通道的类型。chan Type
表示 Type
类型的通道。通道的零值为 nil
。
var channel_name chan channel_types
下面的语句声明了一个类型为 string
的通道 nameChan
,该通道 nameChan
的值为 nil
。
var ch chan string
声明完通道后,通道的值为 nil
,我们不能直接使用,必须先使用 make
函数对通道进行初始化操作。
ch = make(chan channel_type)
使用下面的语句我们可以对上面声明过的通道 ch
进行初始化:
ch = make(chan string)
这样,我们就已经定义好了一个 string
类型的通道 nameChan
。当然,也可以使用简短声明语句一次性定义一个通道:
ch := make(chan string)
往通道发送数据使用的是下面的语法:
// 把 data 数据发送到 channel_name 通道中
// 即把 data 数据写入到 channel_name 通道中
channel_name <- data
从通道接收数据使用的是下面的语法:
// 从 channel_name 通道中接收数据到 value
// 即从 channel_name 通道中读取数据到 value
value := <- channel_name
通道旁的箭头方向指定了是发送数据还是接收数据。箭头指向通道,代表数据写入到通道中;箭头往通道指向外,代表从通道读数据出去。
下面的例子演示了通道的使用:
package main
import (
"fmt"
)
func PrintChan(c chan string) {
// 往通道传入数据 "从0到Go语言微服务架构师"
c <- "从0到Go语言微服务架构师"
}
func main() {
// 创建一个通道
ch := make(chan string)
// 打印 "学习课程:"
fmt.Println("学习课程:")
// 开启协程
go PrintChan(ch)
// 从通道接收数据
rec := <- ch
// 打印从通道接收到的数据
fmt.Println(rec)
// 打印 "学习目标:全面掌握Go语言微服务落地,代码级一次性解决微服务和分布式系统。"
fmt.Println("学习目标:全面掌握Go语言微服务落地,代码级一次性解决微服务和分布式系统。")
}
该程序模拟了两个协程并发调用的场景,在 main
函数中,创建了一个通道,在 main
函数中先打印了 学习课程:
,然后开启协程运行 PrintChan
函数,而 main
函数通过协程接收数据,主协程发生了阻塞,等待通道 ch
发送的数据,在函数中,数据 从0到Go语言微服务架构师
传入通道中,当写入完成时,主协程接收了数据,解除了阻塞状态,打印出从通道接收到的数据 从0到Go语言微服务架构师
,最后打印 `学习目标:全面掌握 Go 语言微服务落地,代码级一次性解决微服务和分布式系统。
Tips: <font color=red>发送与接收默认是阻塞的</font>
对于一个已经使用完毕的通道,我们要将其进行关闭。
close(channel_name)
这里要注意,对于一个已经关闭的通道如果再次关闭会导致报错,我们可以在接收数据时,判断通道是否已经关闭,从通道读取数据返回的第二个值表示通道是否没被关闭,如果已经关闭,返回值为 false
;如果还未关闭,返回值为 true
。
value, ok := <- channel_name
我们在前面讲过 make
函数是可以接收两个参数的,同理,创建通道可以传入第二个参数——容量。
0
时,说明通道中不能存放数据,在发送数据时,必须要求立马有人接收,否则会报错。此时的通道称之为无缓冲通道。1
时,说明通道只能缓存一个数据,若通道中已有一个数据,此时再往里发送数据,会造成程序阻塞。利用这点可以利用通道来做锁。1
时,通道中可以存放多个数据,可以用于多个协程之间的通信管道,共享资源。既然通道有容量和长度,那么我们可以通过 cap
函数和 len
函数获取通道的容量和长度。
package main
import (
"fmt"
)
func main() {
// 创建一个通道
c := make(chan int, 3)
fmt.Println("初始化后:")
fmt.Println("cap =", cap(c))
fmt.Println("len =", len(c))
c <- 1
c <- 2
fmt.Println("传入两个数后:")
fmt.Println("cap =", cap(c))
fmt.Println("len =", len(c))
<- c
fmt.Println("取出一个数后:")
fmt.Println("cap =", cap(c))
fmt.Println("len =", len(c))
}
按照是否可缓冲数据可分为:缓冲通道 与 无缓冲通道 。
无缓冲通道在通道里无法存储数据,接收端必须先于发送端准备好,以确保你发送完数据后,有人立马接收数据,否则发送端就会造成阻塞,原因很简单,通道中无法存储数据。也就是说发送端和接收端是同步运行的。
c := make(chan int)
// 或者
c := make(chan int, 0)
缓冲通道允许通道里存储一个或多个数据,设置缓冲区后,发送端和接收端可以处于异步的状态。
c := make(chan int, 3)
到目前为止,上面定义的都是双向通道,既可以发送数据也可以接收数据。例如:
package main
import (
"fmt"
"time"
)
func main() {
// 创建一个通道
c := make(chan int)
// 发送数据
go func() {
fmt.Println("send: 1")
c <- 1
}()
// 接收数据
go func() {
n := <- c
fmt.Println("receive:", n)
}()
// 主协程休眠
time.Sleep(time.Millisecond)
}
单向通道只能发送或者接收数据。所以可以具体细分为只读通道和只写通道。
<-chan
表示只读通道:
// 定义只读通道
c := make(chan string)
// 定义类型
type Receiver = <-chan string
var receiver Receiver = c
// 或者简单写成下面的形式
type Receiver = <-chan int
receiver := make(Receiver)
chan<-
表示只写通道:
// 定义只写通道
c := make(chan int)
// 定义类型
type Sender = chan<- int
var sender Sender = c
// 或者简单写成下面的形式
type Sender = chan<- int
sender := make(Sender)
下面是一个例子:
package main
import (
"fmt"
"time"
)
// Sender 只写通道类型
type Sender = chan<- string
// Receiver 只读通道类型
type Receiver = <-chan string
func main() {
// 创建一个双向通道
var ch = make(chan string)
// 开启一个协程
go func() {
// 只能写通道
var sender Sender = ch
fmt.Println("即将学习:")
sender <- "Go语言微服务架构核心22讲"
}()
// 开启一个协程
go func() {
// 只能读通道
var receiver Receiver = ch
message := <-receiver
fmt.Println("开始学习: ", message)
}()
time.Sleep(time.Millisecond)
}
使用 for range
循环可以遍历通道,但在遍历时要确保通道是处于关闭状态,否则循环会被阻塞。
package main
import (
"fmt"
)
func loopPrint(c chan int) {
for i := 0; i < 10; i++ {
c <- i
}
// 记得要关闭通道
// 否则主协程遍历完不会结束,而会阻塞
close(c)
}
func main() {
// 创建一个通道
var ch2 = make(chan int, 5)
go loopPrint(ch2)
for v := range ch2 {
fmt.Println(v)
}
}
上面讲过,当通道容量为 1
时,说明通道只能缓存一个数据,若通道中已有一个数据,此时再往里发送数据,会造成程序阻塞。例如:
package main
import (
"fmt"
"time"
)
// 由于 x = x+1 不是原子操作
// 所以应避免多个协程对 x 进行操作
// 使用容量为 1 的通道可以达到锁的效果
func increment(ch chan bool, x *int) {
ch <- true
*x = *x + 1
<- ch
}
func main() {
ch3 := make(chan bool, 1)
var x int
for i := 0; i < 10000; i++ {
go increment(ch3, &x)
}
time.Sleep(time.Millisecond)
fmt.Println("x =", x)
}
讲完了锁,不得不提死锁。当协程给一个通道发送数据时,照理说会有其他 Go 协程来接收数据。如果没有的话,程序就会在运行时触发 panic
,形成死锁。同理,当有协程等着从一个通道接收数据时,我们期望其他的 Go 协程会向该通道写入数据,要不然程序也会触发 panic
。
package main
func main() {
ch := make(chan bool)
ch <- true
}
运行上面的程序,会触发 panic ,报下面的错误:
fatal error: all goroutines are asleep - deadlock!
下面再来看看几个例子。
package main
import "fmt"
func main() {
ch := make(chan bool)
ch <- true
fmt.Println(<-ch)
}
上面的代码你看起来可能觉得没啥问题,创建一个通道,往里面写入数据,再从里面读出数据,但运行后会报同样的错误:
fatal error: all goroutines are asleep - deadlock!
那么为什么会出现死锁呢?前面的基础学的好的就不难想到使用 make
函数创建通道时默认不传递第二个参数,通道中不能存放数据,在发送数据时,必须要求立马有人接收,即该通道为无缓冲通道。所以在接收者没有准备好前,发送操作会被阻塞。
分析完引发异常的原因后,我们可以将代码修改如下,使用协程,将接收者代码放在另一个协程里:
package main
import (
"fmt"
"time"
)
func funcRecieve(c chan bool) {
fmt.Println(<-c)
}
func main() {
ch4 := make(chan bool)
go funcRecieve(ch4)
ch4 <- true
time.Sleep(time.Millisecond)
}
当然,还有一种更加直接的方法,把无缓冲通道改为缓冲通道就行了:
package main
import "fmt"
func main() {
ch5 := make(chan bool, 1)
ch5 <- true
fmt.Println(<-ch5)
}
有时候我们定义了通道的容量,但通道里的容量已经放不下新的数据,而没有接收者接收数据,就会造成阻塞,而对于一个协程来说就会造成死锁:
package main
import "fmt"
func main() {
ch6 := make(chan bool, 1)
ch6 <- true
ch6 <- false
fmt.Println(<-ch6)
}
同理,当程序一直在等待从通道里读取数据,而此时并没有发送者会往通道中写入数据。此时程序就会陷入死循环,造成死锁。
在实际开发中我们并不能保证每个协程执行的时间,如果需要等待多个协程,全部结束任务后,再执行某个业务逻辑。下面我们介绍处理这种情况的方式。
WaitGroup
有几个方法:
Add
:初始值为 0
,这里直接传入子协程的数量,你传入的值会往计数器上加。Done
:当某个子协程完成后,可调用此方法,会从计数器上减一,即子协程的数量减一,通常使用 defer
来调用。Wait
:阻塞当前协程,直到实例里的计数器归零。信道可以实现多个协程间的通信,于是乎我们可以定义一个信道,在任务执行完成后,往信道中写入 true
,然后在主协程中获取到 true
,就可以认为子协程已经执行完毕。
package main
import "fmt"
func main() {
isDone := make(chan bool)
go func() {
for i := 0; i < 5; i++{
fmt.Println(i)
}
isDone <- true
}()
<- isDone
}
运行上面的程序,主协程就会等待创建的协程执行完毕后退出。
使用上面的信道方法,虽然可行,但在你程序中使用很多协程的话,你的代码就会看起来很复杂,这里就要介绍一种更好的方法,那就是使用 sync
包中提供的 WaitGroup 类型。WaitGroup
用于等待一批 Go 协程执行结束。程序控制会一直阻塞,直到这些协程全部执行完毕。当然 WaitGroup
也可以用于实现工作池。
WaitGroup
实例化后就能使用:
var name sync.WaitGroup
下面看具体示例:
package main
import (
"fmt"
"sync"
)
func task(taskNum int, wg *sync.WaitGroup) {
// 延迟调用 执行完子协程计数器减一
defer wg.Done()
// 输出任务号
for i := 0; i < 3; i++ {
fmt.Printf("task %d: %d\n", taskNum, i)
}
}
func main() {
// 实例化 sync.WaitGroup
var waitGroup sync.WaitGroup
// 传入子协程的数量
waitGroup.Add(3)
// 开启一个子协程 协程 1 以及 实例 waitGroup
go task(1, &waitGroup)
// 开启一个子协程 协程 2 以及 实例 waitGroup
go task(2, &waitGroup)
// 开启一个子协程 协程 3 以及 实例 waitGroup
go task(3, &waitGroup)
// 实例 waitGroup 阻塞当前协程 等待所有子协程执行完
waitGroup.Wait()
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。