Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Go协程加管道实现异步批量消费调度任务

Go协程加管道实现异步批量消费调度任务

原创
作者头像
pooky
修改于 2021-03-28 07:59:33
修改于 2021-03-28 07:59:33
2.8K00
代码可运行
举报
文章被收录于专栏:开发随笔开发随笔
运行总次数:0
代码可运行

周末了,这周遇到个问题当时没想明白,周末整理下

题目有点绕口 在现实的项目中这么搞的也不常见,里面牵涉多个知识点,整理下就当学习了。

程序需求:

1:接收任务,从异步消息队列里面监控接收最新的任务消息

2:处理任务,每个任务耗时可能不定

我们常规的做法就是开启一个长监听串行化来一个执行一个,实在不行就多开几个,这种呢人工干预比较重,有时候还盯着不是太好。

程序方案:

1:异步接受消息 ,开启一个协程接受消息 这个不用多开接收消息不会成为瓶颈

2:异步处理消息,开启协程异步处理对应的消息,这里有点要注意是一个消息就开一个处理协程 还是多个消息开启,是值得思考的。

3:批量处理,多个消息开启一个处理协程,防止开启过多的协程,

4:超时处理,如果长时间没有达到批量处理的数量限制,那么也要及时处理

5:限制过多的协程,这个其实没有什么必要,因为go所谓百万协程性能但是既然搞了这个例子 那就完善下吧

代码实现:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
package main

import (
	"errors"
	"fmt"
	"math/rand"
	"runtime"
	"strconv"
	"time"
)

var msgChanLimit = 20 //消息管道大小限制
var handleId int      //协程任务自定义id 用来区分 查看对应的任务批次 每处理一次 ++
/*
periodType 消息产生时间
	1 1秒一个信息会走到超时处理模块;
	2 1毫秒一个信息 会一直走正常处理模块;
	3 随机0秒到 1秒 模拟都会触发的情况
*/
var periodType = 3

var batchNum = 6  // 最多堆积多少未处理信息 进行一批次处理
var batchTime = 3 // 在未满消息 最长多久 进行一批次处理

var maxGoroutines = 10 //最大的协程数量 这个其实感觉意义不大  但是还是搞一个  边界预防吧

//接受消息
func getMsg(msgChan chan string) {
	msgId := 0
	for { //循环接收消息
		msgId++
		msg := "Msg id:" + strconv.Itoa(msgId) + "; AddTime:" + time.Now().Format("2006-01-02 15:04:05")
		msgChan <- msg
		switch periodType {
		case 1:
			time.Sleep(1 * time.Second) // 秒
		case 2:
			time.Sleep(1 * time.Millisecond) // 毫秒
		case 3:
			time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) // 随机
		}
	}
}

// 处理消息
func handleMsg(model int, handleId int, msgSet []string, guard chan struct{}) {
	for i, v := range msgSet {
		fmt.Println("HandleId:", handleId, ";Model", model, ">>> Idx:"+strconv.Itoa(i)+";Content:"+v)
		time.Sleep(1500 * time.Millisecond) //模拟具体处理消息是有耗时的 间隔越大 那么多个后台会有越多的挂起的处理
	}
	<-guard //释放一个限量

}

//无阻塞去接受消息
func unBlockRead(ch chan string) (msg string, err error) {
	select {
	case msg = <-ch:
		return msg, nil
	case <-time.After(time.Microsecond):
		return "", errors.New("nil")
	}
}

func main() {
	guard := make(chan struct{}, maxGoroutines) //守护协程数量限制
	msgChan := make(chan string, msgChanLimit)  //接受消息channel大小
	go getMsg(msgChan)                          // 开始接收消息
	msgSet := make([]string, 0)                 // 临时存放接收到的消息集合
	step := 0                                   //秒计数器 对应秒

	for { //主逻辑处理 开始处理
		if msg, err := unBlockRead(msgChan); err == nil { //接收到消息
			msgSet = append(msgSet, msg)
			if len(msgSet) == batchNum { //达到处理数量
				handleId++
				guard <- struct{}{}
				go handleMsg(1, handleId, msgSet, guard) // 处理当前的msgSet
				msgSet = nil                             //重置
				step = 0
			}
		} else {
			if step > batchTime && len(msgSet) > 0 { // 超时并且不为空
				handleId++
				guard <- struct{}{}
				go handleMsg(2, handleId, msgSet, guard)
				msgSet = nil //重置
				step = 0
			} else {
				step++
				time.Sleep(1 * time.Second) //休息一秒 step++
			}
		}
	}

	// 挂起主进程 防止退出
	for {
		runtime.GC()
	}
}

整体的代码就是这样,具体的可以看注释了(癖好这东西 the more the more)

运行效果

就这样,里面牵涉多个小点,比较有意思

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
为什么这段代码会阻塞?
2. 这段代码开启了一个 goroutine,这个goroutine会向 in 通道中放入2000个 Content 对象,每个对象的 i 字段从0到1999。每放入一个对象都会记录日志。
腾讯云开发者
2024/09/10
4190
为什么这段代码会阻塞?
golang 裸写一个pool池控制协程的大小
http://mp.weixin.qq.com/s?__biz=MjM5OTcxMzE0MQ==&mid=2653369770&idx=1&sn=044be64c577a11a9a13447b373e
李海彬
2018/03/26
1K0
Golang实现生产者-消费者的(N:1)模型
素履coder
2023/07/11
8220
大量连接时使用 使用epoll管理 or golang 多协程
最近开发了一个针对游戏服务器框架测压机器人, 当大量的机器人连接建立时, 通常的做法是给每个机器人配置一个网络read协程, 但是当机器人数量比较高时, 有点担心协程的竞争, 正好无意中看到一个百万级连接服务器的文章,学习了一下epoll对conn io的管理, 自己鼓捣了一下,写了个测试工程, 现将代码分享一下, 有兴趣的可以参考参考
IT工作者
2022/07/21
7400
Golang学习笔记之并发.协程(Goroutine)、信道(Channel)
简单的理解一下,并发就是你在跑步的时候鞋带开了,你停下来系鞋带。而并行则是,你一边听歌一边跑步。 并行并不代表比并发快,举一个例子,当文件下载完成时,应该使用弹出窗口来通知用户。而这种通信发生在负责下载的组件和负责渲染用户界面的组件之间。在并发系统中,这种通信的开销很低。而如果这两个组件并行地运行在 CPU 的不同核上,这种通信的开销却很大。因此并行程序并不一定会执行得更快。 Go 原生支持并发。在Go中,使用 Go 协程(Goroutine)和信道(channel)来处理并发。
李海彬
2018/12/27
1.4K0
Golang学习笔记之并发.协程(Goroutine)、信道(Channel)
go 并发模式
boring返回一个channel,不断往里写数据。main调用,并从channel中获取数据,结果如下:
solate
2022/10/28
5800
go 并发模式
【共识算法】--“raft的实现”
看过之前几期的朋友们应该知道在1号第1期最初的时候就实现过一次raft,但之前实现基本是基于python实现的,这次可结合着PBFT,用golang实现了raft。
帆说区块链
2022/04/26
4860
【共识算法】--“raft的实现”
Go两周入门系列-协程(goroutine)
协程是Go语言的关键特性,主要用于并发编程,协程是一种轻量级的线程,因为协程开销比较小,所以创建上万的协程也不是什么难事,下面介绍协程的基本用法。
用户10002156
2023/10/05
2880
Go两周入门系列-协程(goroutine)
(四十三)golang--管道
计算1-200之间各个数的阶乘,并将每个结果保存在map中,最终显示出来,要求使用goroutine。
西西嘛呦
2020/08/26
5470
Golang | 优雅的计算接口耗时、接口限流以及接口超时处理思路
描述: Goglang 接口耗时监控测试用例 核心:使用 defer + 匿名函数 再加上 time.Since() 函数实现再程序结束完毕时计算此代码片段(接口)执行耗时 示例:
全栈工程师修炼指南
2023/10/31
1.2K0
Golang | 优雅的计算接口耗时、接口限流以及接口超时处理思路
多协程如何使用channel优雅的收集结果
Go语言里面最具特色的就是他的协程和 channel ,有了它们以后我们可以非常方便的处理多线程的问题。
小锟哥哥
2022/05/10
8950
多协程如何使用channel优雅的收集结果
如何控制golang协程的并发数量问题
最近搞压测,写了一个压测的工具,就是想通过go来实现一秒发多少个请求,然后我写了一段这样的代码,如下,当然压测的代码我就没有贴了,我贴了主要核心代码,主要是起一个定时器,然后通过但仅此去读定时器的通道,然后for循环起goroutine,goroutine里面是进行并发的逻辑。
公众号-利志分享
2022/04/25
2.2K0
Go并发调用协程goroutine并通过管道chan收集返回值
这里整理一下go开发当中用到了并发协程多任务,同时收集返回多任务结果,go 协程没有直接返回,只能通过chan返回收集,其中用到几个特性
pooky
2020/10/30
8K2
Golang Channel 实战技巧和说明
channel 不需要通过 close 来释放资源,这个是它与 socket、file 等不一样的地方,对于 channel 而言,唯一需要 close 的就是我们想通过 close 触发 channel 读事件。
Allen.Wu
2022/11/29
7330
Go语言(十 八)context&日志项目
context&日志项目 context 一般场景下取消goroutine的方法 var wg sync.WaitGroup var exit bool func worker(exitChan chan struct{}) { LOOP: for { fmt.Printf("work\n") time.Sleep(time.Second) /*if exit { break } */ select
alexhuiwang
2020/09/23
3680
Go语言(十 八)context&日志项目
Kafka 并发消费单个 partition
kafka可以通过多个partition实现并发,但是针对单个partition,必须顺序提交。假如消息发送顺序为1,2,3,如果先提交3,会导致1,2被提交。所以不能并发执行后立即提交。
Yuyy
2023/05/01
1K0
Golang中的管道(channel) 、goroutine与channel实现并发、单向管道、select多路复用以及goroutine panic处理
管道(channel)是 Go 语言中实现并发的一种方式,它可以在多个 goroutine 之间进行通信和数据交换。管道可以看做是一个队列,通过它可以进行先进先出的数据传输,支持并发的读和写。
周小末天天开心
2023/10/16
7550
Golang中的管道(channel) 、goroutine与channel实现并发、单向管道、select多路复用以及goroutine panic处理
go中消费者和生产者关系的案例深刻理解<-之间的关系
贵哥的编程之路
2024/03/14
1120
15.Go语言-通道
通道(channel) ,就是一个管道,可以想像成 Go 协程之间通信的管道。它是一种队列式的数据结构,遵循先入先出的规则。
面向加薪学习
2022/09/04
6020
Go Goroutine
Goroutine又叫Go语言的协程。Goroutine是Go语言用来实现并发的直接方法。要想完全理解Goroutine必须从操作系统的进程和线程开始说起。
一行舟
2022/08/25
4540
Go Goroutine
推荐阅读
相关推荐
为什么这段代码会阻塞?
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验