如何在Celery任务中“优雅”地管理数据库session?

发布于 2022-09-06 12:07:32 字数 778 浏览 20 评论 0

我有一些操作数据库的celery任务,需要在代码开头新建session,在代码结尾关闭session。

当前的方式是:

@app.task
def celery_task():
    session = DB_Session()
    try:
        foo
    except:
        bar
    finally:
        session.close()

但问题是,这样对错误处理很不友好。而且一整个任务里都用try...except...finally感觉很难看。


最近看Celery的官方文档,想到可以新建一个自定义的MyTask,并在Task.after_return()中关闭session,然后所有celery任务定义的时候都base=MyTask。如下:

class MyTask(celery.task):
    def after_return(self, *foo):
        self.session.close()

@app.task(bind=True, base=MyTask)
def celery_task(self):
    foo

我的问题是,如何才能在一个task创建的时候执行新建session的操作?我在celery的文档中没有找到一个方法是在task创建的时候执行的。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

像极了他 2022-09-13 12:07:32

官方文档或源码中有没有相关实现不是很清楚,但是就算没有实现自己实现也是可以的。
相关实现大概率会通过decorator来做,所以自己写一个decorator就可以了

import functools

import flask

@bp.route('status', methods=['GET'])
def status():
  celery_task.delay()
  return flask.jsonify({'status': 'ok'})

def my_task(fn):
  @functools.wraps(fn)
  def func_wrapper(*args, **kwargs):
    try:
      session = DB_Session()
      res = fn('session', *args, **kwargs)
    except:
      pass
    finally:
      session.close()
      return res
  return func_wrapper


@app.task
@my_task
def celery_task(session):
  pass
傲世九天 2022-09-13 12:07:32

Task.on_bound 是一个类方法,不知道是否能够使用,它的注释如下这样:

Note:
            This class method can be defined to do additional actions when
            the task class is bound to an app.

我也正在做类似的事情:在celery任务中做数据库操作,也遇到类似的烦恼,我先占个坑,测试一下再来更新。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文