Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊machinery的TaskProcessor

聊聊machinery的TaskProcessor

原创
作者头像
code4it
修改于 2021-04-06 03:24:27
修改于 2021-04-06 03:24:27
41400
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下machinery的TaskProcessor

TaskProcessor

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// TaskProcessor - can process a delivered task
// This will probably always be a worker instance
type TaskProcessor interface {
    Process(signature *tasks.Signature) error
    CustomQueue() string
    PreConsumeHandler() bool
}

TaskProcessor接口定义了Process、CustomQueue、PreConsumeHandler方法

Worker

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Worker represents a single worker process
type Worker struct {
    server            *Server
    ConsumerTag       string
    Concurrency       int
    Queue             string
    errorHandler      func(err error)
    preTaskHandler    func(*tasks.Signature)
    postTaskHandler   func(*tasks.Signature)
    preConsumeHandler func(*Worker) bool
}

// CustomQueue returns Custom Queue of the running worker process
func (worker *Worker) CustomQueue() string {
    return worker.Queue
}

// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {
    // If the task is not registered with this worker, do not continue
    // but only return nil as we do not want to restart the worker process
    if !worker.server.IsTaskRegistered(signature.Name) {
        return nil
    }

    taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
    if err != nil {
        return nil
    }

    // Update task state to RECEIVED
    if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
        return fmt.Errorf("Set state to 'received' for task %s returned error: %s", signature.UUID, err)
    }

    // Prepare task for processing
    task, err := tasks.NewWithSignature(taskFunc, signature)
    // if this failed, it means the task is malformed, probably has invalid
    // signature, go directly to task failed without checking whether to retry
    if err != nil {
        worker.taskFailed(signature, err)
        return err
    }

    // try to extract trace span from headers and add it to the function context
    // so it can be used inside the function if it has context.Context as the first
    // argument. Start a new span if it isn't found.
    taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
    tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
    task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

    // Update task state to STARTED
    if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
        return fmt.Errorf("Set state to 'started' for task %s returned error: %s", signature.UUID, err)
    }

    //Run handler before the task is called
    if worker.preTaskHandler != nil {
        worker.preTaskHandler(signature)
    }

    //Defer run handler for the end of the task
    if worker.postTaskHandler != nil {
        defer worker.postTaskHandler(signature)
    }

    // Call the task
    results, err := task.Call()
    if err != nil {
        // If a tasks.ErrRetryTaskLater was returned from the task,
        // retry the task after specified duration
        retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
        if ok {
            return worker.retryTaskIn(signature, retriableErr.RetryIn())
        }

        // Otherwise, execute default retry logic based on signature.RetryCount
        // and signature.RetryTimeout values
        if signature.RetryCount > 0 {
            return worker.taskRetry(signature)
        }

        return worker.taskFailed(signature, err)
    }

    return worker.taskSucceeded(signature, results)
}

//SetPreConsumeHandler sets a custom handler for the end of a job
func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {
    worker.preConsumeHandler = handler
}

Worker实现了TaskProcessor接口,其Process方法先通过worker.server.GetRegisteredTask获取taskFunc,然后通过signature更新state为RECEIVED,之后设置为STARTED,之后执行task.Call(),最后根据结果更新task为failed或者success

小结

machinery的TaskProcessor接口定义了Process、CustomQueue、PreConsumeHandler方法。Worker实现了TaskProcessor接口,其Process方法先通过worker.server.GetRegisteredTask获取taskFunc,然后通过signature更新state为RECEIVED,之后设置为STARTED,之后执行task.Call(),最后根据结果更新task为failed或者success。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Golang任务队列machinery使用与源码剖析(一)
异步任务,是每一位开发者都遇到过的技术名词,在任何一个稍微复杂的后台系统中,异步任务总是无法避免的,而任务队列由于其松耦合、易扩展的特性,成为了实现异步任务的可靠保证。
netkiddy
2018/07/30
10K3
Golang任务队列machinery使用与源码剖析(二)
在Golang任务队列machinery使用与源码剖析(一)一文中,我们主要对golang中任务队列machinery的设计结构以及具体模块的功能与源码实现进行了详细介绍。在了解了基本工作原理之后,本篇系列之二我们将从使用的角度,同时结合源码继续对machinery进一步介绍。
netkiddy
2018/08/06
7.3K5
聊聊machinery的TaskState
machinery的TaskState定义了PENDING、RECEIVED、STARTED、RETRY、SUCCESS、FAILURE状态;TaskState定义了TaskUUID、TaskName、State、Results、Error、CreatedAt、TTL属性;它提供了NewPendingTaskState、NewReceivedTaskState、NewStartedTaskState、NewSuccessTaskState、NewFailureTaskState、NewRetryTaskState方法来根据Signature来创建不同state的TaskState;另外还提供了IsCompleted、IsSuccess、IsFailure方法。
code4it
2021/04/02
3830
聊聊machinery的TaskState
machinery中文文档( 值得收藏 )
首先,我们需要定义一些任务。在example/tasks/tasks.go中查看示例任务。去看看几个例子吧。
Golang梦工厂
2022/07/08
1.8K0
machinery中文文档( 值得收藏 )
machinery入门看这一篇(异步任务队列)
我们在使用某些APP时,登陆系统后一般会收到一封邮件或者一个短信提示我们在某个时间某某地点登陆了。而邮件或短信都是在我们已经登陆后才收到,这里就是采用的异步机制。大家有没有想过这里为什么没有使用同步机制实现呢?我们来分析一下。假设我们现在采用同步的方式实现,用户在登录时,首先会去检验一下账号密码是否正确,验证通过后去给用户发送登陆提示信息,假如在这一步出错了,那么就会导致用户登陆失败,这样是大大影响用户的体验感的,一个登陆提示的优先级别并不是很高,所以我们完全可以采用异步的机制实现,即使失败了也不会影响用户的体验。前面说了这么多,那么异步机制该怎么实现呢?对,没错,就是machinery框架,听说你们还不会使用它,今天我就写一个小例子,我们一起来学习一下他吧。
Golang梦工厂
2022/07/08
1.1K0
machinery入门看这一篇(异步任务队列)
以太坊区块同步
首先根据Synchronise开始区块同步,通过findAncestor找到指定节点的共同祖先,并在此高度进行同步,同时开启多个goroutine同步不同的数据:header、receipt、body,假如同步高度为100的区块,必须先header同步成功同步完成才可以进行body和receipts的同步,而每个部分的同步大致都是由FetchParts来完成的,里面包含了各个Chan的配合,也会涉及不少的回调函数
Al1ex
2021/07/21
4.1K0
以太坊区块同步
Containerd深度剖析-CRI篇
目前我司现网的K8s集群的运行时已经完成从docker到Containerd的切换,有小伙伴对K8s与Containerd调用链涉及的组件不了解,其中Containerd和RunC是什么关系,docker和containerd又有什么区别,以及K8s调用Containerd创建容器又是怎样的流程,最终RunC又是如何创建容器的,诸如此类的疑问。本文就针对K8s使用Containerd作为运行时的整个调用链进行介绍和源码级别的分析。
zouyee
2023/01/31
1.6K0
Containerd深度剖析-CRI篇
Go每日一库之112:asynq
Asynq是一个go语言实现的分布式任务队列和异步处理库,基于redis,类似sidekiq和celery,他具有以下特点:
luckpunk
2023/09/30
1.4K0
聊聊dubbo-go的failbackCluster
dubbo-go-v1.4.2/cluster/cluster_impl/failback_cluster.go
code4it
2020/08/11
3560
聊聊tempodb的Pool
tempodb提供了一个job的pool,NewPool根据Config创建Pool,同时根据cfg.MaxWorkers启动对应个数的p.worker(q),然后执行p.reportQueueLength();RunJobs方法用于提交jobs并等待结果;Shutdown方法用于关闭pool的workQueue、shutdownCh这两个channel。
code4it
2021/01/29
2560
聊聊tempodb的Pool
Docker EOF问题排查
某天接到客户反馈,pod的事件中出现大量的 warning event: Readiness probe failed: OCI runtime exec failed: exec failed: EOF: unknown。但不影响客户访问该服务。
没有故事的陈师傅
2021/06/24
5.1K0
Docker EOF问题排查
以太坊RPC机制
RPC(remote process call),即远程过程调用,意思就是两台物理位置不同的服务器,其中一台服务器的应用想调用另一台服务器上某个应用的函数或者方法,由于不在同一个内存空间不能直接调用,因此需要通过网络来表达语义以及传入的参数,RPC是跨操作系统,跨编程语言的网络通信方式。
Al1ex
2021/07/21
3.7K0
以太坊RPC机制
CI/CD(七)镜像全球分发
代码托管:gitlab CI:tekton pipline/task: 阿里云 serverless容器(spot实例按秒计费)
EricWinn
2025/02/23
640
matic checkpoint理解
checkpoint是Matic协议中最关键的部分。它代表了Bor链状态的快照,应该由⅔+的验证器集证明,然后再验证并提交给部署在以太坊上的合约。
潇洒
2023/10/23
2040
matic checkpoint理解
聊聊cortex的kv.Client
github.com/cortexproject/cortex/pkg/ring/kv/client.go
code4it
2021/01/27
5190
聊聊cortex的kv.Client
kubernetes controller 解析
controller内部有个内存cache,cache 一般和lister/ indexer 一起配合使用, 用一个 Indexer interface进行的包装
王磊-字节跳动
2019/10/07
1.8K0
Tendermint 启动流程
Tendermint 的启动流程比较清析明了,各业务启动流程都在对应的实现代码,主启动流程加载所需配置,由各启动实现类启动自己对应业务,如节点启动相关在 nodeImpl,共识相关处理是 state 中进行处理。
潇洒
2023/10/23
2180
Tendermint 启动流程
一文读懂 K8s controller-runtime
在K8s开发中,经常能听过controller的概念,那么这些概念在K8s底层是如何实现,本文将详细介绍。
我是阳明
2023/10/28
1.2K0
一文读懂 K8s controller-runtime
containerd源码分析
本文是对containerd v0.2.4的源码分析。 ##Containerd源码流程图 源码接口调用详情 从ctr调用containerd-api ####checkpoint(用于快照,doc
Walton
2018/04/13
3K0
containerd源码分析
高效的图像处理:Golang、Asynq、Redis 和 Fiber 用于异步队列处理
在这篇简短的文章中,我将解释一种加速 Web 应用程序的常用方法。它涉及将任务从主线程中移开并将它们放在队列中以进行异步处理,使用队列单独组织和处理这些任务。
Michel_Rolle
2023/12/17
2.8K0
相关推荐
Golang任务队列machinery使用与源码剖析(一)
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验