package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := boring("boring!") // Function returning a channel.
for i := 0; i < 5; i++ {
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
boring返回一个channel,不断往里写数据。main调用,并从channel中获取数据,结果如下:
You say: "boring! 0"
You say: "boring! 1"
You say: "boring! 2"
You say: "boring! 3"
You say: "boring! 4"
You're boring; I'm leaving.
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
joe := boring("Joe")
ann := boring("Ann")
for i := 0; i < 5; i++ {
fmt.Println(<-joe)
fmt.Println(<-ann) // ann will block if joe is not ready to give a value
}
fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
返回多个boring服务,channel做服务的句柄(也就是唯一标识)。返回channel时,通道没有数据会阻塞,按顺序来即可保证输出顺序。 结果如下:
Joe: 0
Ann: 0
Joe: 1
Ann: 1
Joe: 2
Ann: 2
Joe: 3
Ann: 3
Joe: 4
Ann: 4
You're both boring; I'm leaving.
但是当joe 阻塞的时候,就会阻塞ann,程序会阻塞在Joe读取这里。
多个channel 并入一个channel 输出, 不考虑顺序,随机输出
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c) // HL
}
fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
c <- <-input1
}
}()
go func() {
for {
c <- <-input2
}
}()
return c
}
输出类似如下, 不考虑顺序时可以这样使用,类似开车时的汇入,“前方小心右侧车辆汇入”。
Ann: 0
Joe: 0
Ann: 1
Joe: 1
Ann: 2
Joe: 2
Ann: 3
Ann: 4
Ann: 5
Joe: 3
You're both boring; I'm leaving.
input1、input2和c的关系如下图所示:
使用channel 阻塞打印,使得打印按照顺序进行打印.
package main
import (
"fmt"
"math/rand"
"time"
)
type Message struct {
str string
wait chan bool
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 5; i++ {
msg1 := <-c; fmt.Println(msg1.str)
msg2 := <-c; fmt.Println(msg2.str)
msg1.wait <- true // block boring, false is also ok
msg2.wait <- true
}
fmt.Println("You're both boring; I'm leaving.")
}
func boring(msg string) <-chan Message { // Returns receive-only channel of strings.
c := make(chan Message)
waitForIt := make(chan bool) // Shared between all messages.
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
time.Sleep(time.Duration(rand.Intn(2e3)) * time.Millisecond)
<-waitForIt // to be blocked
}
}()
return c // Return the channel to the caller.
}
func fanIn(inputs ... <-chan Message) <-chan Message {
c := make(chan Message)
for i := range inputs {
input := inputs[i] // New instance of 'input' for each loop.
go func() { for { c <- <-input } }()
}
return c
}
结果如下
Joe: 0
Ann: 0
Joe: 1
Ann: 1
Joe: 2
Ann: 2
Joe: 3
Ann: 3
Joe: 4
Ann: 4
You're both boring; I'm leaving.
乱序到达,通过通道形成加锁同步。通道作为锁,
<-waitForIt
会阻塞,直到有数据。
因为没有设置缓冲区,强行将输出同步, 作为知识点了解,实际作用不大。
与switch语句相比,select有比较多的限制,其中最大的一条限制就是每个case语句后面必须是一个Io操作 。
select 作用主要是配合channel实现IO多路复用, 更多多路复用可以查看下面参考资料
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
fmt.Println("You're both boring1; I'm leaving.")
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1:
c <- s
case s := <-input2:
c <- s
}
}
}()
return c
}
fanIn不再使用多个goroutine了,而是使用一个goroutine,在其中有无限循环和select。
Ann: 0
Joe: 0
Ann: 1
Joe: 1
Ann: 2
Joe: 2
Ann: 3
Joe: 3
Ann: 4
Joe: 4
You're both boring1; I'm leaving.
package main
import (
"fmt"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().Unix()) // 这里多加一个rand 随机数
}
func main() {
c := boring("Joe")
for {
select {
case s := <-c:
fmt.Println(s)
case <-time.After(1 * time.Second): // if you change it to more than 1.5 seconds it will not block, eg. 5
fmt.Println("You're too slow.") // it's time to stop last case
return // 这里必须是return 不能是break
}
}
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
因为两个chan都是阻塞的,每次for循环,得到的time.After都是新的1秒,当数字随机的数字大于1秒时,则退出。
有的时候boring服务可能是页面访问,获取资源等服务,我们并不清楚需要多长时间,但是我们有一个时间上限。这个时候可以使用库函数,time.After,到达等待时间后它会返回一个channel,这时我们可以退出程序。
Joe: 0
You're too slow.
在使用select 时候一般我们配合for循环用,如果想跳出for,要用 return进行跳出,不要用break,因为break只会跳出select ,不会跳出for循环
如果在主协程(main)的select 内用runtime.Goexit() 进行退出 会报错: no goroutines (main called runtime.Goexit) - deadlock! ,还是要用return
总体超时需要将timeout 定义在for循环之外
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
c := boring("Joe")
timeout := time.After(5 * time.Second)
for {
select {
case s := <-c:
fmt.Println(s)
case <-timeout:
fmt.Println("You talk too much.")
return
}
}
}
func boring(msg string) <-chan string { // Returns receive-only channel of strings.
c := make(chan string)
go func() { // We launch the goroutine from inside the function.
for i := 0; ; i++ {
c <- fmt.Sprintf("%s: %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
我们只定义一个timeout,到达后就退出。
Joe: 0
Joe: 1
Joe: 2
Joe: 3
Joe: 4
Joe: 5
Joe: 6
Joe: 7
Joe: 8
You talk too much.
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, _ := context.WithTimeout(context.TODO(), 3*time.Second) //定义一个超时的 context
stop := make(chan struct{})
go func() {
time.Sleep(4 * time.Second)
close(stop)
}()
for {
select {
case <-stop:
fmt.Println("Stop ... ")
return
case <-ctx.Done():
fmt.Println("context ... ")
return
}
}
}
结果是
context ...
在循环前定义time.Tick
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
timeout := time.After(3 * time.Second)
timetrick := time.Tick(time.Second)
c := make(chan int)
go func() {
for {
c <- 0
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
}()
for {
select {
case <-c:
fmt.Println("I'm working...")
case <-timetrick:
fmt.Println("1 second pass")
case <-timeout:
fmt.Println("3 second")
return
}
}
}
结果
I'm working...
I'm working...
I'm working...
I'm working...
I'm working...
I'm working...
1 second pass
I'm working...
I'm working...
I'm working...
1 second pass
I'm working...
I'm working...
I'm working...
I'm working...
I'm working...
I'm working...
3 second
定义一个quit 来控制退出
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
quit := make(chan bool)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- {
fmt.Println(<-c)
}
// modify by lady_killer9
fmt.Println("You're boring!")
quit <- true
}
func boring(msg string, quit <-chan bool) <-chan string {
c := make(chan string)
go func() {
for i := 0; ; i++ {
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
return
}
}
}()
return c
}
想退出时,在select的某个return的case对应的channel中写入数据
Joe: 0
Joe: 1
You're boring!
退出时做清理
package main
import (
"fmt"
"math/rand"
"time"
)
func cleanup() {
// added by lady_killer9
fmt.Println("clean up ")
}
func main() {
quit := make(chan string)
c := boring("Joe", quit)
for i := rand.Intn(10); i >= 0; i-- {
fmt.Println(<-c)
}
quit <- "Bye!"
fmt.Printf("Joe says: %q\n", <-quit)
}
func boring(msg string, quit chan string) <-chan string {
c := make(chan string)
go func() {
for i := 0; ; i++ {
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
select {
case c <- fmt.Sprintf("%s: %d", msg, i):
// do nothing
case <-quit:
cleanup()
quit <- "See you!"
return
}
}
}()
return c
}
有的时候,我们收到要关闭的消息后,需要进行文件关闭等清理工作,之后再告诉主程序我们清理完毕,可以退出了,防止内存泄露,文件占用等情况。
Joe: 0
Joe: 1
clean up
Joe says: "See you!"
package main
import (
"fmt"
"time"
)
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
const n = 10000
leftmost := make(chan int)
right := leftmost
left := leftmost
for i := 0; i < n; i++ {
right = make(chan int)
go f(left, right)
left = right
}
start := time.Now()
go func(c chan int) { c <- 1 }(right)
fmt.Println(<-leftmost)
fmt.Println(time.Since(start))
}
类似于传话筒游戏,我们不断的右耳朵进,左耳朵出。这里使用了10000个goroutine,结果如下
10001
6.955789ms
注: 博客里是3.5ms左右,可能是我机器还跑了其他东西的原因
大概花了3ms多一点,就完成了10000个goroutine的通信,如果使用Python等其他语言是很难达到的,这就是goroutine,简单,高效。
Google搜索是一个很好的例子,我们输入问题,然后Google发给多个后端程序进行搜索,可能是网页,图片,视频等,最后将结果进行一个汇总并返回。接下来进行一个仿造:
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
func Google(query string) (results []Result) {
results = append(results, Web(query))
results = append(results, Image(query))
results = append(results, Video(query))
return
}
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Search func(query string) Result
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
在Google时,只是将结果放入结果队列依次放入会等待上一个结果出来。
[web result for "golang"
image result for "golang"
video result for "golang"
]
84.514029ms
等待太浪费时间了,我们可以使用goroutine
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) }()
go func() { c <- Image(query) }()
go func() { c <- Video(query) }()
for i := 0; i < 3; i++ {
result := <-c
results = append(results, result)
}
return
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
使用goroutine就不必等待上一个结果出来 (使用并发扇入这种方式,将调用方法并发)
[image result for "golang"
web result for "golang"
video result for "golang"
]
63.255893ms
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) }()
go func() { c <- Image(query) }()
go func() { c <- Video(query) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
在Google 2.0的fan In模式的基础上,增加了总体超时模式,超过时不再等待其他结果。
timed out
[web result for "golang"
image result for "golang"
]
81.379159ms
我们如何避免丢弃慢速服务器的结果呢?例如,上面的video被丢弃了 答:复制服务器。向多个副本发送请求,并使用第一个响应。
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
result := First("golang",
fakeSearch("replica 1"),
fakeSearch("replica 2"))
elapsed := time.Since(start)
fmt.Println(result)
fmt.Println(elapsed)
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
对于同一个问题,我们启用多个副本,返回最快的服务器搜索到的结果
replica 1 result for "golang"
1.388248ms
随机时间不同,导致传入最先传入c的被拿到,然后return了
接下来,我们针对web、image、video都启用多个服务器进行搜索
package main
import (
"fmt"
"math/rand"
"time"
)
type Result string
type Search func(query string) Result
var (
Web1 = fakeSearch("web1")
Web2 = fakeSearch("web2")
Image1 = fakeSearch("image1")
Image2 = fakeSearch("image2")
Video1 = fakeSearch("video1")
Video2 = fakeSearch("video2")
)
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- First(query, Web1, Web2) }()
go func() { c <- First(query, Image1, Image2) }()
go func() { c <- First(query, Video1, Video2) }()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout:
fmt.Println("timed out")
return
}
}
return
}
func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) {
c <- replicas[i](query)
}
for i := range replicas {
go searchReplica(i)
}
return <-c
}
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}
通过多个副本,选择最快的一个的方式,基本可以保证每种类型的结果都能在超时时间内完成。
[web2 result for "golang"
video1 result for "golang"
image2 result for "golang"
]
37.218453ms
go开启一个协程,并不知道什么时候会结束,其他语言一般会有callback,在go中,函数可以作为参数实现callback,这里再说一种channel实现callback的思路。
开启的协程,可以在结束后像一个channel中写入值,main routine中读取,即可实现阻塞直到完成。
package main
import (
"fmt"
"math/rand"
"time"
)
func doSomething(event string, ch chan<- bool) {
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
ch <- true
}
func main() {
ch := make(chan bool)
go doSomething("event", ch)
for {
select {
case <-ch:
fmt.Println("finish...")
return
}
}
}
一般是等待第一个协程结束或所有协程结束,如果多个协程的话可以计数。
sync里面有WaitGroup, 可以wait所有协程
举个例子,假设厨师做饭一会儿做的快,可能是凉菜,一会儿做的慢,可能是佛跳墙。服务员的端菜速度是一定的,如果菜没有地方放,只能等待服务员拿的话,就会很慢,因为厨师做快了没有地方放(阻塞),做慢了的话服务员要一直等(阻塞)。
package main
import (
"fmt"
"math/rand"
"time"
)
var food = make(chan string)
func cook(foods []string) {
for _, f := range foods {
if f == "凉菜" {
time.Sleep(100 * time.Millisecond)
food <- "凉菜"
} else {
time.Sleep(800 * time.Millisecond)
food <- "佛跳墙"
}
}
}
func server(finish chan bool) {
for {
select {
case name := <-food:
time.Sleep(time.Duration(rand.Intn(600)) * time.Millisecond)
fmt.Println("客官,上菜了:", name)
finish <- true
default:
//fmt.Println("服务员在等待...")
}
}
}
func main() {
foods := []string{"凉菜", "凉菜", "凉菜", "凉菜", "凉菜", "凉菜", "佛跳墙", "凉菜"}
n := len(foods)
cnt := 0
finish := make(chan bool)
start := time.Now()
go cook(foods)
go server(finish)
for {
select {
case <-finish:
{
cnt += 1
if cnt == n {
fmt.Println(time.Since(start))
return
}
}
}
}
}
结果是2秒多
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 佛跳墙
客官,上菜了: 凉菜
2.22571934s
上面代码是没有缓存的,会慢一些,如果把food改为
var food = make(chan string,6)
结果是1.6秒, 比不带buff 阻塞的快了些
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 凉菜
客官,上菜了: 佛跳墙
客官,上菜了: 凉菜
1.666686866s
Go-并发模式总结(扇入模式,超时模式,callback模式等)
这个参考里的源码已经没用kind类型了,其他说明还可以看看。