芹菜和弦回调一直始终启动

发布于 2025-02-09 05:27:13 字数 1209 浏览 2 评论 0原文

更新完成后,我正在尝试使用和弦启动报告更新。

@shared_task(autoretry_for=(Exception,), retry_backoff=True, retry_kwargs {'max_retries': 5})
def upload(df: pd.DataFrame, **kwargs):
    ed = EntityDataPoint(df, **kwargs)
    uploadtasks, source, subtype = ed.upload_database()
    chord(uploadtasks)(final_report.si(logger=ed.logger, 
                                       source=source, 
                                       subtype=subtype,
                                       index=ed.index))

上loadTask为:

    g = group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                     for i in range(0, len(self.data), chunk_size))

当和弦的标题具有2个以上的元素时,前两个子任务成功,并且没有启动组中的其余任务,而没有任何错误,没有任何错误,也没有任何信息芹菜工人记录。在检查工人之后,安排了芹菜检查活动,排队似乎没有任何等待任务。

如果标题(组)具有2个或更少的元素,则没有问题,组任务完成,调用回调。

它似乎不取决于元素的大小(如果组中的每个子任务都发送100行,我们仍然对1000行具有相同的行为)。

如果我只是启动组任务,而没有和弦和回调,那么任务成功而没有任何错误。

我尝试为和弦使用不同的语法,但似乎并没有改变任何内容。

我尝试使用group.link功能来查看会发生什么,并且在执行此操作时,该组似乎完成了,但是在所有组任务完成后,回调不会发生,因为它并不是为此设计的,正如我从我所理解的那样文档,所以这不是我想要的行为。

我使用的是REDIS 7.0.0经纪人的芹菜5.2.3和django 3.2.13带PSQL的后端,带有Python 3.9。 一切都在单独的Docker容器上运行。

I'm trying to use a chord to launch a report update after the update is completed.

@shared_task(autoretry_for=(Exception,), retry_backoff=True, retry_kwargs {'max_retries': 5})
def upload(df: pd.DataFrame, **kwargs):
    ed = EntityDataPoint(df, **kwargs)
    uploadtasks, source, subtype = ed.upload_database()
    chord(uploadtasks)(final_report.si(logger=ed.logger, 
                                       source=source, 
                                       subtype=subtype,
                                       index=ed.index))

With uploadtask being :

    g = group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                     for i in range(0, len(self.data), chunk_size))

When the header of the chord has more than 2 elements, the first two subtasks succeed, and the rest of the tasks in the group and the callback are not launched, without any error being sent anywhere, and without any information in the celery workers logs. After inspecting the workers, with celery inspect active, scheduled, there doesn't seem to be any waiting task in the queue.

If the header (the group) has 2 or less elements, there is no problem, the group tasks finishes, the callback is called.

It does not seem depend on the size of the elements (if each subtask in the group is sending 100 rows, we still have the same behavior for 1000 rows).

If I just launch the group tasks, without the chord and the callback, the tasks succeed without any error.

I tried using different syntaxes for the chord, and it doesn't seem to change anything.

I tried using the group.link feature to see what would happen,and the group seems to finish when doing this, but the callback doesn't happen after all the group tasks are finished ofc since it's not designed for that as I understood from the documentation, so it's not completely the behavior I want.

I'm using Celery 5.2.3 with a Redis 7.0.0 broker and a Django 3.2.13 backend with psql, with python 3.9.
Everything is running on seperate docker containers.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

谷夏 2025-02-16 05:27:13

看来直接将小组作为和弦的标题正在创建问题。它可能是将组中的第一个任务用作标题,而将第二个任务用作回调(尽管我不明白为什么这不会导致这些任务的参数引起某些错误)。
而不是返回:

group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                     for i in range(0, len(self.data), chunk_size))

我现在返回:

[unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                     for i in range(0, len(self.data), chunk_size)]

它正常工作。

It seems that using the group directly as the header of the chord was creating the problem. It was probably using the first task in the group as the header, and the second as the callback (though I can't understand why that didn't caused some error with the arguments of theses tasks).
Instead of returning :

group(unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                     for i in range(0, len(self.data), chunk_size))

I now return :

[unwrap_upload_bulk.s(obj = self, data = self.data.iloc[i:i+chunk_size]) 
                                     for i in range(0, len(self.data), chunk_size)]

And it works just as expected.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文