如何在Celery任务中“优雅”地管理数据库session?
我有一些操作数据库的celery任务,需要在代码开头新建session,在代码结尾关闭session。
当前的方式是:
@app.task
def celery_task():
session = DB_Session()
try:
foo
except:
bar
finally:
session.close()
但问题是,这样对错误处理很不友好。而且一整个任务里都用try...except...finally
感觉很难看。
最近看Celery的官方文档,想到可以新建一个自定义的MyTask
,并在Task.after_return()
中关闭session,然后所有celery任务定义的时候都base=MyTask
。如下:
class MyTask(celery.task):
def after_return(self, *foo):
self.session.close()
@app.task(bind=True, base=MyTask)
def celery_task(self):
foo
我的问题是,如何才能在一个task创建的时候执行新建session的操作?我在celery的文档中没有找到一个方法是在task创建的时候执行的。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
官方文档或源码中有没有相关实现不是很清楚,但是就算没有实现自己实现也是可以的。
相关实现大概率会通过decorator来做,所以自己写一个decorator就可以了
Task.on_bound 是一个类方法,不知道是否能够使用,它的注释如下这样:
我也正在做类似的事情:在celery任务中做数据库操作,也遇到类似的烦恼,我先占个坑,测试一下再来更新。