首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >go channel实现简易eventbus

go channel实现简易eventbus

原创
作者头像
浩瀚星河
发布2025-08-07 18:51:37
发布2025-08-07 18:51:37
1220
举报
文章被收录于专栏:golanggolang

题目

  1. 适用于单机模块,主要用于异步事件分发
  2. 选用合适的数据结构,并编写合适的逻辑保证原子性
  3. 涵盖发布,订阅,取消订阅等功能

首先,选取合适的数据结构是最重要的,选取 map 来存储事件总线,因为一个事件对应多个 channels 选取这种映射结构性能比较好

代码语言:go
复制
type Eventbus struct {
	ec map[string]*EventChans // 设置为指针的目的是能够在订阅和取消订阅的时候原地修改
	mu sync.RWMutex
}

然后在并发编程中,map 是读写不安全的,所以设置一把读写锁保证原子性

具体的EventChans其实是一个存放 channel 的切片

代码语言:go
复制
type EventChans struct {
	query []chan int
	mu    sync.RWMutex
}

同样,在并发编程中,切片也是读写不安全的,所以设置一把读写锁保证原子性

定义接口

代码语言:go
复制
type IEventbus interface {
	Subscribe(event string, ch chan int)
	UnSubscribe(event string, ch chan int)
	Publish(event string, message int)
}

分别对应订阅、取消订阅和发布这三个方法

具体实现

订阅

代码语言:go
复制
func (eb *Eventbus) Subscribe(event string, ch chan int) chan int {
	// 对map加互斥锁
	eb.mu.Lock()
	ec, ok := eb.ec[event]
	if !ok {
		ec = &EventChans{}
		eb.ec[event] = ec // 初始化
	}
	eb.mu.Unlock()

	ec.mu.Lock()
	defer ec.mu.Unlock()
	ec.query = append(ec.query, ch)
	return ch // 返回用于监听广播的值
}
  1. 在订阅过程中,伴随着事件的增加,所以在访问 map 需要加一把互斥锁,在访问完再放开
  2. 在订阅过程中,需要新增 channel,需要加把互斥锁,在访问完放开
  3. 考虑没有对应事件的时候新增事件

取消订阅

代码语言:go
复制
func (eb *Eventbus) UnSubscribe(event string, ch chan int) {
	// 对map加互斥锁
	eb.mu.Lock()
	ec, ok := eb.ec[event]
	eb.mu.Unlock()
	if !ok {
		return // 没找到直接返回
	}

	ec.mu.Lock()
	defer ec.mu.Unlock()
	for i, ech := range ec.query {
		if ch == ech {
			ec.query = append(ec.query[0:i], ec.query[i+1:]...)
			break
		}
	}
}
  1. 在取消订阅过程中,其他事件可能修改 map,所以在访问 map 需要加一把互斥锁,在访问完再放开
  2. 在订阅过程中,需要减少 channel,需要加把互斥锁,在访问完放开
  3. 考虑没有事件的时候直接返回

发布

代码语言:go
复制
func (eb *Eventbus) Publish(event string, message int) {
	// 对map加读锁
	eb.mu.RLock()
	ec, ok := eb.ec[event]
	eb.mu.RUnlock()
	if !ok {
		return // 没找到直接返回
	}

	// 对切片加读锁
	ec.mu.RLock()
	defer ec.mu.RUnlock()
	for _, ch := range ec.query {
		ch <- message
	}
}
  1. 在发布过程中,map 允许被多个 goroutine 读取但是不允许被写入,所以在访问 map 加一把读锁,在访问完再放开
  2. 在发布过程中,不允许 channel 的增减,需要加把互斥锁,在访问完放开
  3. 考虑没有事件的时候直接返回

完整代码以及具体实现

代码语言:go
复制
package main

import (
	"fmt"
	"sync"
	"time"
)

type EventChans struct {
	query []chan int
	mu    sync.RWMutex
}

type Eventbus struct {
	ec map[string]*EventChans // 设置为指针的目的是能够在订阅和取消订阅的时候原地修改
	mu sync.RWMutex
}

func NewEventbus() *Eventbus {
	return &Eventbus{
		mu: sync.RWMutex{},
		ec: make(map[string]*EventChans, 10),
	}
}

type IEventbus interface {
	Subscribe(event string, ch chan int)
	UnSubscribe(event string, ch chan int)
	Publish(event string, message int)
}

func (eb *Eventbus) Subscribe(event string, ch chan int) chan int {
	// 对map加互斥锁
	eb.mu.Lock()
	ec, ok := eb.ec[event]
	if !ok {
		ec = &EventChans{}
		eb.ec[event] = ec // 初始化
	}
	eb.mu.Unlock()

	ec.mu.Lock()
	defer ec.mu.Unlock()
	ec.query = append(ec.query, ch)
	return ch // 返回用于监听广播的值
}

func (eb *Eventbus) UnSubscribe(event string, ch chan int) {
	// 对map加互斥锁
	eb.mu.Lock()
	ec, ok := eb.ec[event]
	eb.mu.Unlock()
	if !ok {
		return // 没找到直接返回
	}

	ec.mu.Lock()
	defer ec.mu.Unlock()
	for i, ech := range ec.query {
		if ch == ech {
			ec.query = append(ec.query[0:i], ec.query[i+1:]...)
			break
		}
	}
}

func (eb *Eventbus) Publish(event string, message int) {
	// 对map加读锁
	eb.mu.RLock()
	ec, ok := eb.ec[event]
	eb.mu.RUnlock()
	if !ok {
		return // 没找到直接返回
	}

	// 对切片加读锁
	ec.mu.RLock()
	defer ec.mu.RUnlock()
	for _, ch := range ec.query {
		ch <- message
	}
}

func subscribe(eb *Eventbus) {
	ch := make(chan int, 1)
	eb.Subscribe("event1", ch)
	wg.Done()
	fmt.Println("订阅完毕,开始等待接收")
	fmt.Println("接收到值:", <-ch)
}

var wg sync.WaitGroup

func main() {
	eb := NewEventbus()
	go subscribe(eb)

	// 发布事件

	wg.Add(1)
	wg.Wait() // 等待订阅成功
	eb.Publish("event1", 666)
	time.Sleep(time.Second)
}

上述程序是萌新在学习 channel 的一个案例,如果有不足的的地方请指出

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 题目
  • 定义接口
  • 具体实现
    • 订阅
    • 取消订阅
    • 发布
  • 完整代码以及具体实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档