如何使用 Ruby 推送 Celery 任务
在我们的应用程序中,我们有一个 Rails 应用程序,它可以对 Flask API 进行 API 调用。然后 API 会调度一个任务,我们使用 Redis 作为队列。
@app.route("/api/article", methods=["POST"])
def api_article():
app.logger.info("[route] /api/article")
.
.
res_id = celery.send_task("tasks.text_stats", args=[article], kwargs={}).id
@celery.task(name="tasks.text_stats")
def text_stats(article):
logger.info("text_stats")
我的想法是,如果 Rails 和 Flask 都可以访问 Redis 数据库,我们可以直接在 Redis 中创建任务,然后由 Celery 工作线程接收该任务。
问题:
- 如何使用 Ruby 在 Redis 中创建 Celery 作业?
- 有没有 Ruby 库支持这个?
In our app we have a Rails app that makes an API call to a Flask API. The API then schedules a task, and we are using Redis as the queue.
@app.route("/api/article", methods=["POST"])
def api_article():
app.logger.info("[route] /api/article")
.
.
res_id = celery.send_task("tasks.text_stats", args=[article], kwargs={}).id
@celery.task(name="tasks.text_stats")
def text_stats(article):
logger.info("text_stats")
What I am thinking is if the Redis database is accessible to both Rails and Flask, we could just create the task directly in Redis, which would then be picked up by the Celery worker.
Questions:
- How to create a Celery job in Redis using Ruby?
- Are there any Ruby libraries which would support this?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
为了解决这个问题,我们可以简单地在 Redis 中检查 Celery 如何将任务推送到
LIST
中。这是一个由 Celery 推送到 Redis 的示例 celery 任务:
上面比较重要的键是
headers.task
,这是你需要调用的 python 函数名称,以及headers.argsrepr 和 headers.kwargsrepr 是一个字符串化元组/字典,表示需要传递给函数的 args/kwargs。
上面的 dict 中还有其他键,例如routing_key(celery 队列)、几个 id(您可以自动生成这些)、eta,您可以使用它们来自定义行为。
一旦我们了解了 celery 如何与 Redis 交互,类似于 这个示例,您只需使用 Ruby 的 Redis 客户端和字符串化的 python
dict
运行LPUSH
即可。实际上,更麻烦的部分是 celery 期望字符串化的 LIST 项采用 dict 格式,这与 Ruby 的 to_json 存在一些差异。您需要手动执行该部分(例如将
booleans
转换为 pythonbool
)To solve this problem we can simply inspect in Redis how Celery pushes tasks to the
LIST
.This is an example celery task that's been pushed to Redis by Celery:
The more important keys in the above is
headers.task
, which is the python function name you need to call, as well asheaders.argsrepr
andheaders.kwargsrepr
, which is a stringified tuple/dict representing the args/kwargs you need to pass to the function.There are other keys in the above
dict
like routing_key (the celery queue), a couple of ids (you can autogenerate these),eta
, that you could use to customize the behavior.Once we understand how celery interacts with Redis, similar to this example, you can simply run
LPUSH
with Ruby's Redis client with the stringified pythondict
.The more troubling part, really, is that celery expects the stringified LIST item to be in
dict
format, which has some differences with Ruby'sto_json
. You'll need to manually do that part (such as convertingbooleans
to pythonbool
)