前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >超实用 Demo:使用 FastAPI、Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流

超实用 Demo:使用 FastAPI、Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流

作者头像
somenzz
发布2022-10-25 21:09:07
2.3K0
发布2022-10-25 21:09:07
举报
文章被收录于专栏:Python七号

异步任务,是 Web 开发中经常遇到的问题,比如说用户提交了一个请求,虽然这个请求对应的任务非常耗时,但是不能让用户等在这里,通常需要立即返回结果,告诉用户任务已提交。任务可以在后续慢慢完成,完成后再给用户发一个完成的通知。

今天分享一份代码,使用 Celery、RabbitMQ 和 MongoDB 实现一个异步任务工作流,你可以修改 task.py 来实现你自己的异步任务。

架构图如下:

其中 Celery 来执行异步任务,RabbitMQ 作为消息队列,MongoDB 存储任务执行结果,FastAPI 提供 Web 接口。

以上所有模块均可使用 Docker 一键部署。

下面为 Demo 使用方法:

1、确保本机已安装 Docker、Git

2、下载源代码:

代码语言:javascript
复制
git clone https://github.com/aarunjith/async-demo.git

3、部署并启动:

代码语言:javascript
复制
cd async-demo
docker compose up --build

4、启动一个异步任务:

代码语言:javascript
复制
$ curl -X POST http://localhost:8080/process

任务会发送到消息队列,同时会立即返回一个任务 id:

代码语言:javascript
复制
❯ curl -X POST http://localhost:8080/process
{"status":"PENDING","id":"a129c666-7b5b-45f7-ba54-9d7b96a1fe58","error":""}%

5、查询任务状态:

代码语言:javascript
复制
curl -X POST http://localhost:8080/check_progress/<task_id>

任务完成后的返回结果如下:

代码语言:javascript
复制
 ❯ curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58
{"status":"SUCEESS","data":"\"hello\""}%

代码目录结构如下:

其中 app.py 如下:

代码语言:javascript
复制
from fastapi import FastAPI
from celery.result import AsyncResult
from tasks import start_processing
from loguru import logger
from pymongo import MongoClient
import uvicorn

# Lets create a connection to our backend where celery stores the results
client = MongoClient("mongodb://mongodb:27017")

# Default database and collection names that Celery create
db = client['task_results']
coll = db["celery_taskmeta"]

app = FastAPI()


@app.post('/process')
async def process_text_file():
    '''
    Process endpoint to trigger the start of a process
    '''
    try:
        result = start_processing.delay()
        logger.info(f'Started processing the task with id {result.id}')
        return {
            "status": result.state,
            'id': result.id,
            'error': ''
        }
    except Exception as e:
        logger.info(f'Task Execution failed: {e}')
        return {
            "status": "FAILURE",
            'id': None,
            'error': e
        }


@app.post('/check_progress/{task_id}')
async def check_async_progress(task_id: str):
    '''
    Endpoint to check the task progress and fetch the results if the task is
    complete.
    '''
    try:
        result = AsyncResult(task_id)
        if result.ready():
            data = coll.find({'_id': task_id})[0]
            return {'status': 'SUCEESS', 'data': data['result']}
        else:
            return {"status": result.state, "error": ''}
    except Exception as e:
        data = coll.find({'_id': task_id})[0]
        if data:
            return {'status': 'SUCEESS', 'data': data['result']}
        return {'status': 'Task ID invalid', 'error': e}

if __name__ == "__main__":
    uvicorn.run("app:app", host='0.0.0.0', port='8080')

如果要实现自己的任务队列,就修改 task.py 来添加自己的异步任务,可以整合到自己的项目中。

最后的话

Celery 是异步任务非常好用的工具,推荐阅读分布式异步任务队列神器之-Celery一文搞定 celery 任务远程调用。RabbitMQ 消息队列可以确保服务重新启动时数据也不丢失,因此这个 Demo 有很强的实用价值,如果觉得有帮助,可以转发、关注、讨论

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python七号 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 最后的话
相关产品与服务
容器镜像服务
容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档