作者:山楂大卷 链接:https://www.jianshu.com/p/215510810c59 來源:简书
系统性地介绍golang基础的资料实在太多了,这里不再一一赘述。本文的思路是从另一个角度来由浅入深地探究下Go程序的套路。毕竟纸上得来终觉浅,所以,能动手就不要动口。有时候几天不写代码,突然间有一天投入进来做个东西,才恍然发觉,也只有敲代码的时候,才能找回迷失的自己,那可以忘掉一切的不开心。
1package main
2
3import (
4 "fmt"
5)
6
7func main() {
8 fmt.Println("hello world")
9}
go程序结构从整体上来说就是这样的,第一行看起来这一定就是包头声明了,程序以包为单位,一个文件夹是一个包,一个包下可能有多个文件,但是包名都是同一个。相对C/C++程序的include来说,这里是import,后面跟的就是别的包名,一个包里定义的变量或类型,本包内都可见,若首字母大写,则可以被导出。如果引入了程序里不使用的包,编译会报错,报错,错。声明不使用的变量也一样,对,会报错。这里行尾没有分号,左大括号必须那样放,缩进也不用你操心等等,编码风格中的很多问题在这里都不再是问题,是的,go fmt帮你都搞定了,所以你看绝大部分go程序风格都好接近的。写一段时间代码后,你会发现,这种风格确实简单,干净利落。
通过一些概念的学习和介绍,设计并实现个线程池,相信很多地方都可能用到这种模型或各种变形。
变量的声明、定义、赋值、指针等不想啰嗦了,去别的地方学吧。
我们先来定义一个结构体吧
1package package1
2
3type User struct {
4 Name string
5 addr int
6 age int
7}
你一定注意到了,Name首字母是大写的,在package2包中,import package1后就可以通过user.Name访问Name成员了,Name是被导出的。但addr和age在package2中就不能直接访问了,这俩没有被导出,只能在package1包中被直接访问,也就是私有的。那如何在package2中获取没有被导出的成员呢?我们来看下方法。
1func (u User) GetAge() string {
2 return u.age
3}
4
5func(u *User) SetAge(age int){
6 u.age = age
7}
方法的使用和C++或者Java都很像的。下面代码段中user的类型是*User,你会发现,无论方法的接收者是对象还是指针,方法调用时都只用.,而代表指针的->已经不在了。
1user := &User{
2 Name: name,
3 addr: addr,
4 age: age,
5}
6user.SetAge(100)
7fmt.Println(user.GetAge())
还有常用的构造对象的方式是这样的
1func NewUser(name string, addr string, age int) *User {
2 return &User{
3 Name: name,
4 addr: addr,
5 age: age,
6 }
7}
1 user := new(User)
2 user := &User{}//与前者等价
3 user := User{}
Go中没有继承,没有了多态,也没有了模板。争论已久的继承与组合问题,在这里也不是问题了,因为已经没得选择了。比如我想实现个线程安全的整型(假设只用++和--),可能这么来做
1type safepending struct {
2 pending int
3 mutex sync.RWMutex
4}
5
6func (s *safepending) Inc() {
7 s.mutex.Lock()
8 s.pending++
9 s.mutex.Unlock()
10}
11
12func (s *safepending) Dec() {
13 s.mutex.Lock()
14 s.pending--
15 s.mutex.Unlock()
16}
17
18func (s *safepending) Get() int {
19 s.mutex.RLock()
20 n := s.pending
21 s.mutex.RUnlock()
22 return n
23}
也可以用嵌套写法
1type safepending struct {
2 pending int
3 *sync.RWMutex
4}
5
6func (s *safepending) Inc() {
7 s.Lock()
8 s.pending++
9 s.Unlock()
10}
11
12func (s *safepending) Dec() {
13 s.Lock()
14 s.pending--
15 s.Unlock()
16}
17
18func (s *safepending) Get() int {
19 s.RLock()
20 n := s.pending
21 s.RUnlock()
22 return n
23}
这样safepending类型将直接拥有sync.RWMutex类型中的所有属性,好方便的写法。
一个interface类型就是一个方法集,如果其他类型实现了interface类型中所有的接口,那我们就可以说这个类型实现了interface类型。举个例子:空接口interface{}包含的方法集是空,也就可以说任何类型都实现了它,也就是说interface{}可以代表任何类型,类型直接的转换看下边的例子吧。
首先定义一个worker结构体, worker对象中存放很多待处理的request,pinding代表待处理的request数量,以worker为元素,实现一个小顶堆,每次Pop操作都返回负载最低的一个worker。 golang标准库中提供了heap结构的容器,我们仅需要实现几个方法,就可以实现一个堆类型的数据结构了,使用时只需要调用标准库中提供的Init初始化接口、Pop接口、Push接口,就可以得到我们想要的结果。我们要实现的方法有Len、Less、Swap、Push、Pop,请看下边具体代码。另外值得一提的是,山楂君也是通过标准库中提供的例子学习到的这个知识点。
1type Request struct {
2 fn func() int
3 data []byte
4 op int
5 c chan int
6}
7
8type Worker struct {
9 req chan Request
10 pending int
11 index int
12 done chan struct{}
13}
14
15type Pool []*Worker
16
17func (p Pool) Len() int {
18 return len(p)
19}
20func (p Pool) Less(i, j int) bool {
21 return p[i].pending < p[j].pending
22}
23
24func (p Pool) Swap(i, j int) {
25 p[i], p[j] = p[j], p[i]
26 p[i].index = i
27 p[j].index = j
28}
29
30func (p *Pool) Push(x interface{}) {
31 n := len(*p)
32 item := x.(*Worker)
33 item.index = n
34 *p = append(*p, item)
35}
36
37func (p *Pool) Pop() interface{} {
38 old := *p
39 n := len(*p)
40 item := old[n-1]
41 //item.index = -1
42 *p = old[:n-1]
43 return item
44}
1package main
2
3import (
4 "container/heap"
5 "log"
6 "math/rand"
7)
8
9var (
10 MaxWorks = 10000
11 MaxQueue = 1000
12)
13
14func main() {
15 pool := new(Pool)
16 for i := 0; i < 4; i++ {
17 work := &Worker{
18 req: make(chan Request, MaxQueue),
19 pending: rand.Intn(100),
20 index: i,
21 }
22 log.Println("pengding", work.pending, "i", i)
23 heap.Push(pool, work)
24 }
25
26 heap.Init(pool)
27 log.Println("init heap success")
28 work := &Worker{
29 req: make(chan Request, MaxQueue),
30 pending: 50,
31 index: 4,
32 }
33 heap.Push(pool, work)
34 log.Println("Push worker: pending", work.pending)
35 for pool.Len() > 0 {
36 worker := heap.Pop(pool).(*Worker)
37 log.Println("Pop worker:index", worker.index, "pending", worker.pending)
38 }
39}
程序的运行结果如下,可以看到每次Pop的结果都返回一个pending值最小的一个work元素。
12017/03/11 12:46:59 pengding 81 i 0
22017/03/11 12:46:59 pengding 87 i 1
32017/03/11 12:46:59 pengding 47 i 2
42017/03/11 12:46:59 pengding 59 i 3
52017/03/11 12:46:59 init heap success
62017/03/11 12:46:59 Push worker: pending 50
72017/03/11 12:46:59 Pop worker:index 4 pending 47
82017/03/11 12:46:59 Pop worker:index 3 pending 50
92017/03/11 12:46:59 Pop worker:index 2 pending 59
102017/03/11 12:46:59 Pop worker:index 1 pending 81
112017/03/11 12:46:59 Pop worker:index 0 pending 87
细心的你肯能会发现,不是work么,怎么没有goroutine去跑任务?是的山楂君这里仅是演示了小顶堆的构建与使用,至于如何用goroutine去跑任务,自己先思考一下吧。 其实加上类似于下边这样的代码就可以了
1func (w *Worker) Stop() {
2 w.done <- struct{}{}
3}
4
5func (w *Worker) Run() {
6 go func() {
7 for {
8 select {
9 case req := <-w.req:
10 req.c <- req.fn()
11 case <-w.done:
12 break
13 }
14 }
15 }()
16}
golang中的并发机制很简单,掌握好goroutine、channel以及某些程序设计套路,就能用的很好。当然,并发程序设计中存在的一切问题与语言无关,只是每种语言中基础设施对此支持的程度不一,Go程序中同样都要小心。
官方对goroutine的描述:
They're called goroutines because the existing terms—threads, coroutines, processes, and so on—convey inaccurate connotations. A goroutine has a simple model: it is a function executing concurrently with other goroutines in the same address space. It is lightweight, costing little more than the allocation of stack space. And the stacks start small, so they are cheap, and grow by allocating (and freeing) heap storage as required. Goroutines are multiplexed onto multiple OS threads so if one should block, such as while waiting for I/O, others continue to run. Their design hides many of the complexities of thread creation and management. Prefix a function or method call with the go keyword to run the call in a new goroutine. When the call completes, the goroutine exits, silently. (The effect is similar to the Unix shell's & notation for running a command in the background.)
启动一个goroutine,用法很简单:
1go DoSomething()
看channel的描述:
A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type. The value of an uninitialized channel is nil.
简而言之,就是提供了goroutine之间的同步与通信机制。
Don't communicate by sharing memory; share memory by communicating
这就是Go程序中很重要的一种程序套路。拿一个具体的小应用场景来说吧:一个Map类型的数据结构,其增删改查操作可能在多个线程中进行,我们会用什么样的方案来实现呢?
对于方案3其实就是对Go程序这种套路的小应用,这种思想当然和语言无关,但是在Go语言中通过“通信”来共享内存的思路非常容易实现,有原生支持的goroutine、channel、select、gc等基础设施,也许你会有"大消息"传递场景下的性能顾虑,但channel是支持引用类型的传递的,且会自动帮你进行垃圾回收,一个大结构体的引用类型实际上可能才占用了十几个字节的空间。这实在是省去了山楂君很多的功夫。看Go程序的具体做法:
1type job struct {
2 // something
3}
4
5type jobPair struct {
6 key string
7 value *job
8}
9
10type worker struct {
11 jobqueue map[string]*job // key:UserName
12 jobadd chan *jobPair
13}
14
15// 并不是真正的map insert操作,仅发消息给另外一个线程
16func (w *worker) PushJob(user string, job *job) {
17 pair := &jobPair{
18 key: user,
19 value: job,
20 }
21 w.jobadd <- pair
22}
23
24// 并不是真正的map delete操作,仅发消息给另外一个线程
25func (w *worker) RemoveJob(user string) {
26 w.jobdel <- user
27}
28
29func (w *worker) Run() {
30 go func() {
31 for {
32 select {
33 case jobpair := <-w.jobadd:
34 w.insertJob(jobpair.key, jobpair.value)
35 case delkey := <-w.jobdel:
36 w.deleteJob(delkey)
37 //case other channel
38 // for _, job := range w.jobqueue {
39 // do something use job
40 // log.Println(job)
41 // }
42 }
43 }
44 }()
45}
46func (w *worker) insertJob(key string, value *job) error {
47 w.jobqueue[key] = value
48 w.pending.Inc()
49 return nil
50}
51
52func (w *worker) deleteJob(key string) {
53 delete(w.jobqueue, key)
54 w.pending.Dec()
55}
模型详见下边流程图
线程池模型.png
由具体业务的生产者线程生成一个个不同的job,通过共同的Balance均衡器,将job分配到不同的worker去处理,每个worker占用一个goroutine。在job数量巨多的场景下,这种模型要远远优于一个job占用一个goroutine的模型。并且可以根据不同的业务特点以及硬件配置,配置不同的worker数量以及每个worker可以处理的job数量。
我们可以先定义个job结构体,根据业务不同,里边会包含不同的属性。
1type job struct {
2 conn net.Conn
3 opcode int
4 data []byte
5 result chan ResultType //可能需要返回处理结果给其他channel
6}
7type jobPair struct {
8 key string
9 value *job
10}
然后看下worker定义
1type worker struct {
2 jobqueue map[string]*job // key:UserName
3 broadcast chan DataType
4 jobadd chan *jobPair
5 jobdel chan string
6 pending safepending
7 index int
8 done chan struct{}
9}
10
11func NewWorker(idx int, queue_limit int, source_limit int, jobreq_limit int) *worker {
12 return &worker{
13 jobqueue: make(map[string]*job, queue_limit),
14 broadcast: make(chan DataType, source_limit), //4家交易所
15 jobadd: make(chan jobPair, jobreq_limit),
16 jobdel: make(chan string, jobreq_limit),
17 pending: safepending{0, sync.RWMutex{}},
18 index: idx,
19 done: make(chan struct{}),
20 }
21}
22
23func (w *worker) PushJob(user string, job *job) {
24 pair := jobPair{
25 key: user,
26 value: job,
27 }
28 w.jobadd <- pair
29}
30
31func (w *worker) RemoveJob(user string) {
32 w.jobdel <- user
33}
34
35func (w *worker) Run(wg *sync.WaitGroup) {
36 wg.Add(1)
37 go func() {
38 log.Println("new goroutine, worker index:", w.index)
39 defer wg.Done()
40 ticker := time.NewTicker(time.Second * 60)
41 for {
42 select {
43 case data := <-w.broadcast:
44 for _, job := range w.jobqueue {
45 log.Println(job, data)
46 }
47 case jobpair := <-w.jobadd:
48 w.insertJob(jobpair.key, jobpair.value)
49 case delkey := <-w.jobdel:
50 w.deleteJob(delkey)
51 case <-ticker.C:
52 w.loadInfo()
53 case <-w.done:
54 log.Println("worker", w.index, "exit")
55 break
56 }
57 }
58 }()
59}
60
61func (w *worker) Stop() {
62 go func() {
63 w.done <- struct{}{}
64 }()
65}
66func (w *worker) insertJob(key string, value *job) error {
67 w.jobqueue[key] = value
68 w.pending.Inc()
69 return nil
70}
71
72func (w *worker) deleteJob(key string) {
73 delete(w.jobqueue, key)
74 w.pending.Dec()
75}
结合上边提到的小顶堆的实现,我们就可以实现一个带负载均衡的线程池了。 一种模式并不能应用于所有的业务场景,山楂君觉得重要的是针对不同的业务场景去设计或优化编程模型的能力,以上有不妥之处,欢迎吐槽或指正,喜欢也可以打赏。
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。
Golang语言社区
ID:Golangweb
www.bytedancing.com
游戏服务器架构丨分布式技术丨大数据丨游戏算法学习