celery处理10w级别的更新记录的任务,是创建10w个任务,还是创建一个任务扫表循环处理?各有何优势?

发布于 2022-09-07 12:41:03 字数 780 浏览 13 评论 0

数据库有10w个记录,半年后可能会增加到20w,但最终应该不会超过100w

服务器配置:
python3.6 celery+rabbitMQ
云主机 ubuntu 16.04 1G 1核
数据库 postgresql 10, 有100个连接数的限制

表结构如下:

1111.png

last_update字段是上次请求的时间(每条记录我们需要至少1小时内更新一次,允许有10分钟的误差)
uuid 字段决定发起请求时传给对方api的参数

每个记录的last_update 可能不一样,是根据这个记录的添加时间而定的,以后每次更新记录,这个字段就发生变化

我们目前程序的思路是:
在celery中创建了一个任务A,这个任务每隔1小时工作一次,
查询出 更新时间在1小时之前的 的所有记录,
然后用for循环 对查询出的记录拼接url,把拼接的Url发送给异步任务B

任务B的目的很简单,根据得到的url,去请求数据,写入数据库,并更新last_update 字段

这种方式,只要创建2个celery任务即可,但是总感觉这样不太健壮
网上说celery可以支撑百万级别的任务,我就在考虑 要不要每个记录,创建一个celery任务?

斗胆发帖求助各位前辈,我这种情况,用哪种思路比较好? 大家有什么改进方案吗?

非常感谢

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

烧了回忆取暖 2022-09-14 12:41:03

楼主现在都实现,就已经是每个记录对应一个 task实例了。
首先我们先做两个定义:

  1. task,就是你定义的celery方法,比如:
@celery.task
def celery_task():
  pass
  1. task实例,就是实际将要运行的任务
task_instance = celery_task.delay()

任务一,查询;任务二,遍历及更新。
所以楼主本身的设计就是:
两个task,百万级(数据足够多的话)task实例(即已经为每个符合条件的数据创建了一个任务了)。

由于评论里不是很方便回答楼主的问题,就在这里做出评论里问题的回答了。
方案一:
增加celery的消费者,及将worker数量加大。
不建议,因为不可控因素较多,还可能达不到预期效果。
方案二(个人建议,可根据情况修改):
自行增加判断标志位。
不知道楼主是怎样使用celery的,就假设通过redis完成的发布订阅任务操作了。

# 以定时任务的方式启动,没小时执行一次
@celery.task
def query_from_db():
  results = db.query
  for result in results:
    if redis.get(result.id):
      continue
    # 设置一个超时时间
    # update在一小时内成功,下次执行query_from_db任务时,仍会创建新的更新任务
    # update失败,超时后,redis删除result.id相应记录,即超时后会创建新的更新任务
    redis.set(result.id, 'something', two_hours)
    update_result.delay(result.id)
    
@celery.task
def update_result(result_id):
  result = db.query.get(result_id)
  rv = requests.get(.....)
  result.update(rv.json())
  redis.delete(result_id)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文