package main
import (
"fmt"
"time"
)
/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct {
f func() error //一个无参的函数类型
}
//通过NewTask来创建一个Task
func NewTask(f func() error) *Task {
t := Task{
f: f,
}
return &t
}
//执行Task任务的方法
func (t *Task) Execute() {
t.f() //调用任务所绑定的函数
}
/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct {
EntryChannel chan *Task //对外接收Task的入口
worker_num int //协程池最大worker数量,限定Goroutine的个数
JobsChannel chan *Task //协程池内部的任务就绪队列
}
//创建一个协程池
func NewPool(cap int) *Pool {
p := Pool{
EntryChannel: make(chan *Task),
worker_num: cap,
JobsChannel: make(chan *Task),
}
return &p
}
//协程池创建一个worker并且开始工作
func (p *Pool) worker(work_ID int) {
//worker不断的从JobsChannel内部任务队列中拿任务
for task := range p.JobsChannel {
//如果拿到任务,则执行task任务
task.Execute()
fmt.Println("worker ID ", work_ID, " 执行完毕任务")
}
}
//让协程池Pool开始工作
func (p *Pool) Run() {
//1,首先根据协程池的worker数量限定,开启固定数量的Worker,
// 每一个Worker用一个Goroutine承载
for i := 0; i < p.worker_num; i++ {
fmt.Println("开启固定数量的Worker:", i)
go p.worker(i)
}
//2, 从EntryChannel协程池入口取外界传递过来的任务
// 并且将任务送进JobsChannel中
for task := range p.EntryChannel {
p.JobsChannel <- task
}
//3, 执行完毕需要关闭JobsChannel
close(p.JobsChannel)
fmt.Println("执行完毕需要关闭JobsChannel")
//4, 执行完毕需要关闭EntryChannel
close(p.EntryChannel)
fmt.Println("执行完毕需要关闭EntryChannel")
}
//主函数
func main() {
//创建一个Task
t := NewTask(func() error {
fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05"))
return nil
})
//创建一个协程池,最大开启3个协程worker
p := NewPool(3)
//开一个协程 不断的向 Pool 输送打印一条时间的task任务
go func() {
for {
p.EntryChannel <- t
}
}()
//启动协程池p
p.Run()
}
动态版(待完善)
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
f func()
}
func NewTask(f func()) Task {
return Task{
f: f,
}
}
type Work struct {
status int
closeChan chan struct{}
}
func NewWork() *Work {
return &Work{
status: 0,
closeChan: make(chan struct{}),
}
}
func (w *Work) work(job chan Task, wg *sync.WaitGroup) {
ticker := time.NewTicker(3 * time.Second)
//定时查询自己
for {
select {
case <-w.closeChan:
//判断当前最小值
wg.Done()
break
case task := <-job:
w.status = 1
task.f()
case <-ticker.C:
w.status = 0
ticker.Reset(3 * time.Second)
}
}
}
type Pool struct {
Max int
Min int
JobChan chan Task
Workers []*Work
}
func NewPoll(max, min int) *Pool {
return &Pool{
Max: max,
Min: min,
JobChan: make(chan Task),
Workers: make([]*Work, 0),
}
}
func (p *Pool) Submit(t Task) {
p.JobChan <- t
}
func (p *Pool) Run() {
var wg sync.WaitGroup
for i := 0; i < p.Min; i++ {
// go NewWork().work(p.JobChan, &wg)
p.Workers = append(p.Workers, NewWork())
}
p.Schdule(&wg)
fmt.Println("11111111")
p.Watch(&wg)
wg.Wait()
}
func (p *Pool) Schdule(wg *sync.WaitGroup) {
for j := 0; j < len(p.Workers); j++ {
if p.Workers[j].status == 0 {
wg.Add(1)
go p.Workers[j].work(p.JobChan, wg)
}
}
}
// 监控worker
func (p *Pool) Watch(wg *sync.WaitGroup) {
fmt.Println("watch")
//TODO 监控work数 小于最大数,且持续5秒没完成,则创建新worker
for {
ticker := time.NewTicker(5 * time.Second)
select {
case <-ticker.C:
fmt.Println("timeeeeeeeeeeee")
for _, v := range p.Workers {
if v.status == 0 && len(p.Workers) > p.Min {
//关闭worker
close(v.closeChan)
}
}
if len(p.Workers) < p.Max && len(p.Workers) >= p.Min {
p.Workers = append(p.Workers, NewWork())
p.Schdule(wg)
}
ticker.Reset(5 * time.Second)
}
}
}
func main() {
p := NewPoll(10, 3)
go func() {
for {
p.Submit(NewTask(func() {
// fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
}))
}
}()
go func() {
for {
time.Sleep(time.Second)
fmt.Println(len(p.Workers))
}
}()
p.Run()
}
参考链接:https://blog.csdn.net/finghting321/article/details/106492915/