首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊storagetapper的pool

聊聊storagetapper的pool

原创
作者头像
code4it
修改于 2021-03-04 01:47:46
修改于 2021-03-04 01:47:46
45300
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下storagetapper的pool

Thread

storagetapper/pool/pool.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
type Thread interface {
    Start(m uint, f func())
    Adjust(m uint)
    Terminate() bool
    NumProcs() uint
}

Thread接口定义了Start、Adjust、Terminate、NumProcs方法

pool

storagetapper/pool/pool.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/*Create helps to hide poolImpl in the package, but not really required */
func Create() Thread {
    return &poolImpl{}
}

type poolImpl struct {
    mutex       sync.Mutex
    numProcs    uint
    maxNumProcs uint
    fn          func()
}

/*Start instantiates a pool of size of 'm' of 'f' goroutines */
/*Start and Create separation allows to pass pool instance to 'f' goroutine */
func (p *poolImpl) Start(m uint, f func()) {
    p.fn = f
    p.Adjust(m)
}

/*Adjust resizes the pool. It creates new threads if requested size is bigger
* then current size, while it assumes threads cooperation when requested size is
* smaller then current size. Threads should periodically call Terminate function
* and obey the result. */
func (p *poolImpl) Adjust(m uint) {
    p.mutex.Lock()
    defer p.mutex.Unlock()
    log.Debugf("Current size=%v, current maximum size=%v, requested size=%v", p.numProcs, p.maxNumProcs, m)
    p.maxNumProcs = m
    if p.numProcs < p.maxNumProcs {
        adj := p.maxNumProcs - p.numProcs
        shutdown.Register(int32(adj))
        for i := uint(0); i < adj; i++ {
            go func() { defer shutdown.Done(); p.fn() }()
        }
        p.numProcs = m
    }
}

/*Terminate return true if the caller thread need to terminate */
func (p *poolImpl) Terminate() bool {
    //Uncomment if Terminate is called frequently
    //Introduces a race when thread can miss Pool resize event, that's ok, so as
    //some other threads may see the event, or we will see it on the next
    //iteration
    //  if p.numProcs <= p.maxNumProcs {
    //      return false
    //  }

    p.mutex.Lock()
    defer p.mutex.Unlock()

    if p.numProcs > p.maxNumProcs {
        p.numProcs--
        log.Debugf("Terminating. Current size=%v, current maximum size=%v", p.numProcs, p.maxNumProcs)
        return true
    }

    return false
}

/*NumProcs return current size of the pool */
func (p *poolImpl) NumProcs() uint {
    p.mutex.Lock()
    defer p.mutex.Unlock()
    return p.numProcs
}

poolImpl定义了mutex、numProcs、maxNumProcs、fn属性;它实现了Thread接口,其Start方法设置了fn,同时执行Adjust方法;Adjust方法在numProcs小于maxNumProcs时会执行shutdown.Register,然后挨个执行shutdown.Done();Terminate方法对于numProcs大于maxNumProcs的情况递减numProcs

实例

storagetapper/pool/pool_test.go

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
func TestBasic(t *testing.T) {
    var m sync.Mutex
    var nProcs int32

    sig := make(chan bool)

    p := Create()

    if p.NumProcs() != 0 {
        t.Fatalf("Initially not zero")
    }

    p.Start(2, func() {
        m.Lock()
        atomic.AddInt32(&nProcs, 1)
        log.Debugf("Starting new proc, nProcs=%v", nProcs)
        m.Unlock()
        for !p.Terminate() {
            <-sig
            log.Debugf("Woken up")
        }
        m.Lock()
        atomic.AddInt32(&nProcs, -1)
        log.Debugf("Terminating proc, nProcs=%v", nProcs)
        m.Unlock()
    })

    /* Check that both real number and reported by thread pool equal to expected
    * value */
    waitFor(&nProcs, 2, 5, t)
    if p.NumProcs() != 2 {
        t.Fatalf("numProcs != 2")
    }

    p.Adjust(8)

    waitFor(&nProcs, 8, 5, t)
    if p.NumProcs() != 8 {
        t.Fatalf("numProcs != 8")
    }

    p.Adjust(3)

    for i := 0; i < 5; i++ {
        sig <- true
    }

    waitFor(&nProcs, 3, 5, t)
    if p.NumProcs() != 3 {
        t.Fatalf("numProcs != 3")
    }

    p.Adjust(0)
    for i := 0; i < 3; i++ {
        sig <- true
    }

    waitFor(&nProcs, 0, 5, t)
    if p.NumProcs() != 0 {
        t.Fatalf("numProcs != 0")
    }
}

小结

storagetapper的Thread接口定义了Start、Adjust、Terminate、NumProcs方法;poolImpl实现了Thread接口;其Adjust可以在numProcs小于maxNumProcs的时候进行扩容;Terminate会在numProcs大于maxNumProcs的时候递减numProcs。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
聊聊storagetapper的cache
storagetapper的cache是一个cacheEntry的map,cacheEntry定义了Pipe和config.PipeConfig;CacheGet方法会获取cache,获取不到则Create;CacheDestroy则会通过加锁遍历cache,挨个执行pipe.Close()。
code4it
2021/03/05
2610
聊聊storagetapper的cache
聊聊tempodb的Pool
tempodb提供了一个job的pool,NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength();RunJobs方法用于提交jobs并等待结果;Shutdown方法用于关闭pool的workQueue、shutdownCh这两个channel。
code4it
2021/02/08
3090
聊聊tempodb的Pool
tempodb提供了一个job的pool,NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength();RunJobs方法用于提交jobs并等待结果;Shutdown方法用于关闭pool的workQueue、shutdownCh这两个channel。
code4it
2021/01/29
2650
聊聊tempodb的Pool
[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架
简介: 本文描述了使用Go语言实现的、适应于Go语言并发模型的一种支持多路复用的网络协议处理框架,并提供了框架的代码实现。作者将这种框架用于网络游戏服务器中的协议处理,但也可用于其他领域。 应用背景: 在网络游戏服务器设计中,一般都会遇到协议多路复用的场景。比如登录服务器和玩家客户端之间有1:N的多个TCP连接;登录服务器和游戏服务器之间是1:1的TCP连接。玩家登录游戏的大致流程是这样的: 玩家连接登录服务器 登录服务器向数据库请求玩家数据 登录服务器获取到玩家数据,把玩家数据转发给游戏服务器进行加载包括
李海彬
2018/03/22
9130
聊聊storagetapper的server
storagetapper的server提供了StartHTTPServer、Shutdown方法;其init方法注册了/health,/schema,/cluster,/table,/config,/这几个url。
code4it
2021/02/28
2430
聊聊storagetapper的server
详解Go语言调度循环源码实现
提到"调度",我们首先想到的就是操作系统对进程、线程的调度。操作系统调度器会将系统中的多个线程按照一定算法调度到物理CPU上去运行。虽然线程比较轻量,但是在调度时也有比较大的额外开销。每个线程会都占用 1M 以上的内存空间,线程切换和恢复寄存器中的内容也需要向系统申请资源。
luozhiyun
2021/02/21
1.4K0
聊聊storagetapper的Lock
storagetapper的Lock接口定义了TryLock、TryLockShared、Lock、Refresh、Unlock、Close方法;myLock定义了conn、connID、name、db.Addr、n、mu、isLocked属性,它使用db实现了Lock接口,它借助了mysql的GET_LOCK、RELEASE_LOCK、IS_USED_LOCK函数来实现。
code4it
2021/03/01
2350
聊聊storagetapper的Lock
Go 常见并发模式实现(二):通过缓冲通道实现共享资源池
今天这篇教程我们继续演示常见并发模式的 Go 语言实现 —— 通过缓冲通道(channel)实现共享资源池。
学院君
2020/10/19
1.3K0
Go 常见并发模式实现(二):通过缓冲通道实现共享资源池
手摸手Go 深入剖析sync.Pool
如果能够将所有内存都分配到栈上无疑性能是最佳的,但不幸的是我们不可避免需要使用堆上分配的内存。我们可以优化使用堆内存时的性能损耗吗?答案是肯定的。Go同步包中,sync.Pool提供了保存和访问一组临时对象并复用它们的能力。
用户3904122
2022/06/29
9220
手摸手Go 深入剖析sync.Pool
golang集成测试:dockertest testcontainers-go
在做集成测试的时候,每次测试前,如果通过docker重启一个干净的容器是不是免去了数据清理的苦恼。https://github.com/testcontainers/testcontainers-go和https://github.com/ory/dockertest可以解决我们的苦恼,它们很相似都是调用docker的api实现镜像的拉取和容器的启动关闭。然后我们可以基于容器做对应的集成测试。
golangLeetcode
2023/03/01
8380
golang集成测试:dockertest testcontainers-go
Go语言实战笔记(十六)| Go 并发示例-Pool
这篇文章演示使用有缓冲的通道实现一个资源池,这个资源池可以管理在任意多个goroutine之间共享的资源,比如网络连接、数据库连接等,我们在数据库操作的时候,比较常见的就是数据连接池,也可以基于我们实现的资源池来实现。
飞雪无情
2018/08/28
6190
深度解密Go语言之sync.pool
最近在工作中碰到了 GC 的问题:项目中大量重复地创建许多对象,造成 GC 的工作量巨大,CPU 频繁掉底。准备使用 sync.Pool 来缓存对象,减轻 GC 的消耗。为了用起来更顺畅,我特地研究了一番,形成此文。本文从使用到源码解析,循序渐进,一一道来。
梦醒人间
2020/04/27
1.3K0
聊聊golang的tunny
tunny的Worker接口定义了Process、BlockUntilReady、Interrupt、Terminate方法;NewFunc方法创建的是closureWorker,NewCallback方法创建的是callbackWorker。
code4it
2021/04/27
5280
聊聊golang的tunny
Golang连接池的几种实现案例
而维持一个连接池,最基本的要求就是要做到:thread safe(线程安全),尤其是在Golang这种特性是goroutine的语言中。
KevinYan
2020/03/12
2.1K0
golang库源码学习——Pond,小而精的工作池库
pond 是一个轻量级的 Goroutine 池库,用于高效管理并发任务。它提供了灵活的配置选项和多种策略,适合处理高并发场景。
粲然
2025/05/22
1250
golang库源码学习——Pond,小而精的工作池库
linux网络编程学习笔记之五 —–并发机制与线程�
简述下常见的进程和线程分配方式:(好吧,我仅仅是举几个样例作为笔记。。。并发的水太深了,不敢妄谈。。。)
全栈程序员站长
2022/07/12
3590
linux网络编程学习笔记之五 —–并发机制与线程�
btcd p2p 网络分析
比特币依赖于对等网络来实现信息的共享与传输,网络中的每个节点即可以是客户端也可以是服务端,本篇文章基于比特币go版本btcd探索比特币对等网络的实现原理,整个实现从底层到上层可以分为地址,连接,节点三层,每层都有自己的功能与职责。下面逐一的分析这三个部分的构成与功能
魂祭心
2019/03/12
1.7K0
深入分析Go1.18 GMP调度器底层原理
Go 语言有强大的并发能力,能够简单的通过 go 关键字创建大量的轻量级协程 Goroutine,帮助程序快速执行各种任务,比Java等其他支持多线程的语言在并发方面更为强大,除了会用它,我们还需要掌握其底层原理,自己花时间把 GMP 调度器的底层源码学习一遍,才能对它有较为深刻的理解和掌握,本文是自己个人对于 Go语言 GMP 调度器(Go Scheduler)底层原理的学习笔记。
涂明光
2022/11/27
2.7K0
sync.Pool实现原理
对象的创建和销毁会消耗一定的系统资源(内存,gc等),过多的创建销毁对象会带来内存不稳定与更长的gc停顿,因为go的gc不存在分代,因而更加不擅长处理这种问题。因而go早早就推出Pool包用于缓解这种情况。Pool用于核心的功能就是Put和Get。当我们需要一个对象的时候通过Get获取一个,创建的对象也可以Put放进池子里,通过这种方式可以反复利用现有对象,这样gc就不用高频的促发内存gc了。
魂祭心
2019/08/12
7120
[Go] golang缓冲通道实现资源池
go的pool资源池: 1.当有多个并发请求的时候,比如需要查询数据库 2.先创建一个2个容量的数据库连接资源池 3.当一个请求过来的时候,去资源池里请求连接资源,肯定是空的就创建一个连接,执行查询,结束后放入了资源池里 4.当第二个请求过来的时候,也是去资源池请求连接资源,就直接在池中拿过来一个连接进行查询 5.当并发大的时候,资源池里面没有足够连接资源,就会不停创建新资源,放入池里面的时候,也会放不进去,就主动关闭掉这个资源 6.这里的资源池实质上是一个缓冲通道,里面放着连接资源
唯一Chat
2019/09/10
8800
[Go] golang缓冲通道实现资源池
相关推荐
聊聊storagetapper的cache
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档