前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Celery 任务:SQLAlchemy 会话处理指南

Celery 任务:SQLAlchemy 会话处理指南

作者头像
用户1418987
发布2024-09-06 10:40:31
1100
发布2024-09-06 10:40:31
举报
文章被收录于专栏:coder
Celery 任务:SQLAlchemy 会话处理指南_自定义
Celery 任务:SQLAlchemy 会话处理指南_自定义

最近在做 AI RAG 相关的项目功能,对于 RAG 需要生成一些文本处理的异步任务,使用到了 Celery 。今天就写写关于Celery 任务的文章,SQLAlchemy 的真实情况是:

  • 它的学习曲线比 Django ORM 更陡峭
  • 需要一些示例代码
  • 你需要了解一些较低层次的概念
  • 有一些难以理解的文档

如果您确实使用 `SQLAlchemy`,则必须为 `Celery` 任务编写一些示例文件,其中包含风险意大利面条代码。

在本文中,我将向您介绍一些基本的 SQLAlchemy 概念,并向您展示如何在 Celery 任务中使用 SQLAlchemy,而无需求助于第三方包,这

  • 帮助您了解事物是如何运作的
  • 提供了一个通用的解决方案,即使没有 Web 框架,也可以与 FlaskFastAPI 或其他任何东西一起使用
Celery 任务:SQLAlchemy 会话处理指南_自定义_02
Celery 任务:SQLAlchemy 会话处理指南_自定义_02

SQLAlchemy

Django ORM 世界中的生活非常简单。数据库操作通过模型对象提供:

代码语言:javascript
复制
from celery import Celery

app = Celery(...)

@app.task()
def my_task():
   book = Book.objects.get(title="To Kill a Mockingbird")
   ...
   book.save()

SQLAlchemy 世界中,情况非常不同。所有数据库操作都是通过会话对象执行的。会话与模型对象严格分开:

代码语言:javascript
复制
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   session = Session(engine)
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()
   session.close()

会话建立与数据库的对话,并代表您在其生命周期内加载、创建或操作的所有对象的暂存区域。

会话管理

您可以将 SQLAlchemy 会话视为数据库事务。作为一般规则,会话的生命周期应该与访问和操作数据库数据的函数和对象分开并位于外部。会议应该很短。例如,在传入 Celery 任务请求的上下文中,应在任务代码的开头创建会话并在结束时关闭,而不是无限期地保持打开状态并在任务之间共享。

代码语言:javascript
复制
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   session = Session(engine)
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()
   session.close()

或者,使用上下文管理器:

代码语言:javascript
复制
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

@app.task()
def my_task():
   with Session(engine) as session:
      book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
      ...
      session.add(book)
      session.commit()

Celery task

我对上述两个选项的问题是,它在每个任务中涉及大量重复的锅炉代码。如果每个 Celery 任务请求都带有一个现成的会话对象,而不必在开始时创建它并在最后关闭它,那就太好了。像这样的事情:

代码语言:javascript
复制
def my_task(session):
   book = session.query(Book).filter_by(title="To Kill a Mockingbird").one()
   ...
   session.add(book)
   session.commit()

事实证明,在运行时注入会话变量是不可能的。不过可以绑定任务。绑定任务始终将任务实例作为其第一个参数。

代码语言:javascript
复制
@app.task(bind=True)
def my_task(self):
   ...

默认情况下,self类型为celery.Taskcelery.Task定义了可用于 Celery 任务的所有方法,例如apply_asyncretry

您的代码和 Celery 任务之间的每次交互以及您的工作线程和 Celery 任务之间的每次交互都是通过这些celery.Task方法发生的。事实上,当你的worker处理一个任务时,它总是遵循以下顺序:

  1. 跑步before_start
  2. 运行任务
  3. 跑步after_return

即使步骤 2 中的任务抛出异常,也after_return能保证运行。您可以使用它来简化 SQLAlchemy 会话的创建和拆卸:

  • 在中创建会话before_start
  • 使会话可用于绑定任务
  • 关闭会话after_return
代码语言:javascript
复制
import celery
from sqlalchemy.orm import Session

class MyTask(celery.Task):
    def __init__(self):
        self.sessions = {}

    def before_start(self, task_id, args, kwargs):
        self.sessions[task_id] = Session(...)
        super().before_start(task_id, args, kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        session = self.sessions.pop(task_id)
        session.close()
        super().after_return(status, retval, task_id, args, kwargs, einfo)

    @property
    def session(self):
        return self.sessions[self.request.id]

请注意,每个流程只有一个任务实例,这意味着流程中的每个任务共享相同的任务对象。为了隔离每个任务请求的 SQLAlchemy 会话,我使用字典和唯一的任务请求 ID 作为键。

绑定任务

到目前为止我们有:

  • MyTask,自定义celery.Task实现
  • 一个任务,绑定celery.Task到 Celery 任务

缺少的是绑定MyTask而不是celery.Task任务。为此,Celery 提供了以下base参数:

代码语言:javascript
复制
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

app = Celery(...)
engine = create_engine("...")

class MyTask(celery.Task):
    def __init__(self):
        self.sessions = {}

    def before_start(self, task_id, args, kwargs):
        self.sessions[task_id] = Session(...)
        super().before_start(task_id, args, kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        session = self.sessions.pop(task_id)
        session.close()
        super().after_return(status, retval, task_id, args, kwargs, einfo)

    @property
    def session(self):
        return self.sessions[self.request.id] 

@app.task(bind=True, base=MyTask)
def my_task(self):
    book = self.session.query(Book).filter_by(title="To Kill a Mockingbird").one()
    ...
    self.session.add(book)
    self.session.commit()

这是一个通用解决方案,将 SQLAlchemy 会话处理委托给自定义任务类。它使您的任务代码免受重复的样板代码的影响。你怎么认为?可以在评论区讨论👇。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-05-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SQLAlchemy
  • 会话管理
  • Celery task
  • 绑定任务
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档