Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >聊聊machinery的TaskProcessor

聊聊machinery的TaskProcessor

原创
作者头像
code4it
修改于 2021-04-06 03:24:27
修改于 2021-04-06 03:24:27
41400
代码可运行
举报
文章被收录于专栏:码匠的流水账码匠的流水账
运行总次数:0
代码可运行

本文主要研究一下machinery的TaskProcessor

TaskProcessor

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// TaskProcessor - can process a delivered task
// This will probably always be a worker instance
type TaskProcessor interface {
    Process(signature *tasks.Signature) error
    CustomQueue() string
    PreConsumeHandler() bool
}

TaskProcessor接口定义了Process、CustomQueue、PreConsumeHandler方法

Worker

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
// Worker represents a single worker process
type Worker struct {
    server            *Server
    ConsumerTag       string
    Concurrency       int
    Queue             string
    errorHandler      func(err error)
    preTaskHandler    func(*tasks.Signature)
    postTaskHandler   func(*tasks.Signature)
    preConsumeHandler func(*Worker) bool
}

// CustomQueue returns Custom Queue of the running worker process
func (worker *Worker) CustomQueue() string {
    return worker.Queue
}

// Process handles received tasks and triggers success/error callbacks
func (worker *Worker) Process(signature *tasks.Signature) error {
    // If the task is not registered with this worker, do not continue
    // but only return nil as we do not want to restart the worker process
    if !worker.server.IsTaskRegistered(signature.Name) {
        return nil
    }

    taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
    if err != nil {
        return nil
    }

    // Update task state to RECEIVED
    if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
        return fmt.Errorf("Set state to 'received' for task %s returned error: %s", signature.UUID, err)
    }

    // Prepare task for processing
    task, err := tasks.NewWithSignature(taskFunc, signature)
    // if this failed, it means the task is malformed, probably has invalid
    // signature, go directly to task failed without checking whether to retry
    if err != nil {
        worker.taskFailed(signature, err)
        return err
    }

    // try to extract trace span from headers and add it to the function context
    // so it can be used inside the function if it has context.Context as the first
    // argument. Start a new span if it isn't found.
    taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
    tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
    task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

    // Update task state to STARTED
    if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
        return fmt.Errorf("Set state to 'started' for task %s returned error: %s", signature.UUID, err)
    }

    //Run handler before the task is called
    if worker.preTaskHandler != nil {
        worker.preTaskHandler(signature)
    }

    //Defer run handler for the end of the task
    if worker.postTaskHandler != nil {
        defer worker.postTaskHandler(signature)
    }

    // Call the task
    results, err := task.Call()
    if err != nil {
        // If a tasks.ErrRetryTaskLater was returned from the task,
        // retry the task after specified duration
        retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
        if ok {
            return worker.retryTaskIn(signature, retriableErr.RetryIn())
        }

        // Otherwise, execute default retry logic based on signature.RetryCount
        // and signature.RetryTimeout values
        if signature.RetryCount > 0 {
            return worker.taskRetry(signature)
        }

        return worker.taskFailed(signature, err)
    }

    return worker.taskSucceeded(signature, results)
}

//SetPreConsumeHandler sets a custom handler for the end of a job
func (worker *Worker) SetPreConsumeHandler(handler func(*Worker) bool) {
    worker.preConsumeHandler = handler
}

Worker实现了TaskProcessor接口,其Process方法先通过worker.server.GetRegisteredTask获取taskFunc,然后通过signature更新state为RECEIVED,之后设置为STARTED,之后执行task.Call(),最后根据结果更新task为failed或者success

小结

machinery的TaskProcessor接口定义了Process、CustomQueue、PreConsumeHandler方法。Worker实现了TaskProcessor接口,其Process方法先通过worker.server.GetRegisteredTask获取taskFunc,然后通过signature更新state为RECEIVED,之后设置为STARTED,之后执行task.Call(),最后根据结果更新task为failed或者success。

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark提交任务的不同方法及执行流程
了解Spark架构原理及相关任务提交流程前,我们需要先了解一下Spark中的一些角色概念。
大数据真好玩
2019/08/08
3.8K0
Spark内核详解 (4) | Spark 部署模式
实际上,除了上述这些通用的集群管理器外,Spark 内部也提供了方便用户测试和学习的简单集群部署模式。由于在实际生产环境下使用的绝大多数的集群管理器是 Hadoop YARN,因此我们关注的重点是 Hadoop YARN 模式下的 Spark 集群部署。
不温卜火
2020/10/28
1.7K0
Spark内核详解 (4) | Spark 部署模式
【Spark篇】---Spark中yarn模式两种提交任务方式
Spark可以和Yarn整合,将Application提交到Yarn上运行,和StandAlone提交模式一样,Yarn也有两种提交任务的方式。
LhWorld哥陪你聊算法
2018/09/13
2.5K0
【Spark篇】---Spark中yarn模式两种提交任务方式
【推荐】Spark知识点
客户那边需要快速出一个版本,开发的这块使用到的之前没怎么用过,比如用oozie调度spark程序时候,你可能在你本地调试代码没有问题,上传到集群上之后,运行就出各种错,加上我们使用的服务器配置很差,导致各种服务需要的资源都不都用,然后每天就是在各种配置,话不多说了,赶紧来复习一下spark;
木野归郎
2022/04/14
3730
【推荐】Spark知识点
Zzreal的大数据笔记-SparkDay03
Spark的运行模式 Spark的运行模式多种多样,灵活多变,部署在单机上时,既可以用本地模式运行,也可以用伪分布模式运行,而当以分布式集群的方式部署时,也有众多的运行模式可供选择,这取决于集群的实际情况,底层的资源调度即可以依赖外部资源调度框架,也可以使用Spark内建的Standalone模式。对于外部资源调度框架的支持,目前的实现包括相对稳定的Mesos模式,以及hadoop YARN模式。 本地模式:常用于本地开发测试,本地还分别 local 和 local cluster (1)standal
企鹅号小编
2018/01/25
6190
【Spark】 Spark的基础环境 Day02
​ 当Spark Application运行到YARN上时,在提交应用时指定master为yarn即可,同时需要告知YARN集群配置信息(比如ResourceManager地址信息),此外需要监控Spark Application,配置历史服务器相关属性。
Maynor
2021/12/07
3630
【Spark】 Spark的基础环境 Day02
【Spark】Spark之what
Spark:通用大数据快速处理引擎。可以基于Hadoop上存储的大数据(HDFS、Hive、HBase等任何实现了Hadoop接口的存储系统)进行计算。
章鱼carl
2022/03/31
9300
【Spark】Spark之what
Spark核心技术原理透视二(Spark运行模式)
上一章节详细讲了Spark的运行原理,没有关注的童鞋可以关注加米谷大数据查看上一章节的详细内容。通过Spark运行原理的讲解大家了解了Spark在底层的运行,那Spark的运行模式又是什么样的呢?通过本文以下的讲解大家可以详细的学习了解。
加米谷大数据
2018/03/27
1.5K6
Spark核心技术原理透视二(Spark运行模式)
Python大数据之PySpark(四)SparkBase&Core
在哪个文件下面更改?spark-env.sh中增加YARN_CONF_DIR的配置目录
Maynor
2023/10/04
5540
Python大数据之PySpark(四)SparkBase&Core
Spark基础
1.Spark 使用DAG 调度器、查询优化器和物理执行引擎,能够在批处理和流数据获得很高的性能。2.spark把运算的中间数据(shuffle阶段产生的数据)存放在内存,迭代计算效率更高,mapreduce的中间结果需要落地,保存到磁盘;3.Spark计算框架对内存的利用和运行的并行度比mapreduce高,Spark运行容器为executor,内部ThreadPool中线程运行一个Task,mapreduce在线程内部运行container,container容器分类为MapTask和ReduceTask。Spark程序运行并行度高;
857技术社区
2022/05/17
4540
Spark基础
大数据基础:Spark工作原理及基础概念
导语 | Apache Spark 是专为大规模数据处理而设计的快速通用计算引擎,在数据挖掘和机器学习领域有着广泛的应用,现在也已形成一个高速发展、应用广泛的生态系统。本文将为大家详细介绍 Spark 的核心技术原理,希望与大家一同交流。文章作者:熊峰,腾讯大数据研发工程师。
腾讯云开发者
2020/12/23
1.7K0
大数据基础:Spark工作原理及基础概念
带你理解并亲手实践 Spark HA 部署配置及运行模式
由于 Spark 是计算框架,还需要有底层存储系统、资源协调管理、分布式协作管理等框架等进行支撑,因此我们这里使用在《万字+50图,详解 Hadoop HA 完全分布式部署配置及运行调试》中部署的 Hadoop 作为 Spark 的存储及管理系统,在此基础上以 HA 模式来安装部署并运行 Spark 集群。
数人之道
2022/03/26
2.4K0
带你理解并亲手实践 Spark HA 部署配置及运行模式
不会这20个Spark热门技术点,你敢出去面试大数据吗?
关于大数据面试中对Spark的知识考查不需本菌多解释什么了吧~本篇博客,博主为大家分享20个Spark热门技术点,希望今年出去面试,实习的同学,尤其是想去大厂的同学,一定要把下面的20个技术点看完。
大数据梦想家
2021/01/27
6800
不会这20个Spark热门技术点,你敢出去面试大数据吗?
Spark 在Yarn上运行Spark应用程序
在 YARN 中,每个应用程序实例都有一个 ApplicationMaster 进程,该进程是为该应用程序启动的第一个容器。应用程序负责从 ResourceManager 上请求资源。一旦分配了资源,应用程序将指示 NodeManagers 启动容器。ApplicationMasters 消除了对活跃客户端的依赖:启动应用程序的进程可以终止,并且从在集群上由 YARN 管理的进程继续协作运行。
smartsi
2019/08/07
2K0
2021年大数据Spark(九):Spark On Yarn两种模式总结
包含两个部分:应用管理者AppMaster和运行应用进程Process(如MapReduce程序MapTask和ReduceTask任务),如下图所示:
Lansonli
2021/10/09
4960
Spark内核分析之spark作业的三种提交方式
        最近在研究Spark源码,顺便记录一下,供大家学习参考,如有错误,请批评指正。好,废话不多说,这一篇先来讲讲Spark作业提交流程的整体架构。
z小赵
2018/09/05
7770
Spark内核分析之spark作业的三种提交方式
Spark on yarn
spark on yarn是spark集群模式之一,通过resourcemanager进行调度,较之standalone模式,不需要单独启动spark服务。
阿dai学长
2020/03/09
1.6K0
Spark on yarn
数据分析工具篇——spark on yarn模式
spark on yarn架构有两种模式,分为Yarn-client模式和Yarn-cluster模式,本文与大家一起了解一下这两种模式:
数据森麟
2021/03/09
8470
数据分析工具篇——spark on yarn模式
Spark on Yarn | Spark,从入门到精通
欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)
美图数据技术团队
2018/08/13
8920
Spark on Yarn | Spark,从入门到精通
2020年最新Spark企业级面试题【上】
现在距离2021年还有不到一个月的时间了,是不是有的小伙明年不知该怎么复习spark,以及不知道该备战企业中会问到那些问题。好今天他来了总结了20个企业中经常被问到的面题以及会附带一些笔试题哦,编写不易建议收藏。
大数据老哥
2021/02/04
4960
2020年最新Spark企业级面试题【上】
推荐阅读
相关推荐
Spark提交任务的不同方法及执行流程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验