设置预定的工作?

发布于 2024-07-13 19:41:09 字数 309 浏览 10 评论 0原文

我一直在使用 Django 开发一个网络应用程序,我很好奇是否有一种方法可以安排作业定期运行。

基本上我只想运行数据库并自动定期进行一些计算/更新,但我似乎找不到任何有关执行此操作的文档。

有谁知道如何设置这个?

澄清一下:我知道我可以设置一个 cron 作业来执行此操作,但我很好奇 Django 中是否有某些功能可以提供此功能。 我希望人们能够自己部署这个应用程序,而无需进行太多配置(最好是零)。

我考虑过通过简单地检查自上次将请求发送到站点以来是否应该运行作业来“追溯”触发这些操作,但我希望有一些更干净的东西。

I've been working on a web app using Django, and I'm curious if there is a way to schedule a job to run periodically.

Basically I just want to run through the database and make some calculations/updates on an automatic, regular basis, but I can't seem to find any documentation on doing this.

Does anyone know how to set this up?

To clarify: I know I can set up a cron job to do this, but I'm curious if there is some feature in Django that provides this functionality. I'd like people to be able to deploy this app themselves without having to do much config (preferably zero).

I've considered triggering these actions "retroactively" by simply checking if a job should have been run since the last time a request was sent to the site, but I'm hoping for something a bit cleaner.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(26

红颜悴 2024-07-20 19:41:09

我采用的一种解决方案是执行此操作:

1)创建 自定义管理命令,例如

python manage.py my_cool_command

2) 使用 cron(在 Linux 上)或 at(在 Windows 上)在所需次数。

这是一个简单的解决方案,不需要安装繁重的 AMQP 堆栈。 然而,使用其他答案中提到的像 Celery 这样的东西有很好的优势。 特别是,使用 Celery 时,不必将应用程序逻辑分散到 crontab 文件中,这是件好事。 然而,cron 解决方案对于中小型应用程序以及您不需要大量外部依赖项的应用程序非常有效。

编辑:

在更高版本的 Windows 中,Windows 8、Server 2012 及更高版本已弃用 at 命令。 您可以使用 schtasks.exe 来实现相同的用途。

**** 更新 ****
这是新的链接用于编写自定义管理命令的 django doc

One solution that I have employed is to do this:

1) Create a custom management command, e.g.

python manage.py my_cool_command

2) Use cron (on Linux) or at (on Windows) to run my command at the required times.

This is a simple solution that doesn't require installing a heavy AMQP stack. However there are nice advantages to using something like Celery, mentioned in the other answers. In particular, with Celery it is nice to not have to spread your application logic out into crontab files. However the cron solution works quite nicely for a small to medium sized application and where you don't want a lot of external dependencies.

EDIT:

In later version of windows the at command is deprecated for Windows 8, Server 2012 and above. You can use schtasks.exe for same use.

**** UPDATE ****
This the new link of django doc for writing the custom management command

凉城已无爱 2024-07-20 19:41:09

Celery 是一个分布式任务队列,基于 AMQP (RabbitMQ) 构建。 它还以类似 cron 的方式处理定期任务(请参阅定期任务)。 根据您的应用程序,它可能值得一看。

Celery 使用 django 非常容易设置(docs),并且定期任务实际上会在停机时跳过错过的任务。 Celery 还具有内置的重试机制,以防任务失败。

Celery is a distributed task queue, built on AMQP (RabbitMQ). It also handles periodic tasks in a cron-like fashion (see periodic tasks). Depending on your app, it might be worth a gander.

Celery is pretty easy to set up with django (docs), and periodic tasks will actually skip missed tasks in case of a downtime. Celery also has built-in retry mechanisms, in case a task fails.

独孤求败 2024-07-20 19:41:09

我们开源了我认为是结构化应用程序的源代码。 布莱恩的解决方案上面也提到了。 我们希望得到任何/所有反馈!

https://github.com/tivix/django-cron

带有一个管理命令:

./manage.py runcrons

它 完成工作。 每个 cron 都被建模为一个类(因此都是面向对象的),并且每个 cron 以不同的频率运行,我们确保相同的 cron 类型不会并行运行(以防 cron 本身运行的时间比它们的频率更长!)

We've open-sourced what I think is a structured app. that Brian's solution above alludes too. We would love any / all feedback!

https://github.com/tivix/django-cron

It comes with one management command:

./manage.py runcrons

That does the job. Each cron is modeled as a class (so its all OO) and each cron runs at a different frequency and we make sure the same cron type doesn't run in parallel (in case crons themselves take longer time to run than their frequency!)

时常饿 2024-07-20 19:41:09

如果您使用标准 POSIX 操作系统,则使用 cron

如果您使用的是 Windows,请使用 at

编写 Django 管理命令来

  1. 找出它们所在的平台。

  2. 为您的用户执行适当的“AT”命令,为您的用户更新 crontab。

    为您的用户

If you're using a standard POSIX OS, you use cron.

If you're using Windows, you use at.

Write a Django management command to

  1. Figure out what platform they're on.

  2. Either execute the appropriate "AT" command for your users, or update the crontab for your users.

锦欢 2024-07-20 19:41:09

有趣的新可插拔 Django 应用程序: django-chronograph

您只需添加一个 cron 条目即可作为一个计时器,你有一个非常好的 Django 管理界面来运行脚本。

Interesting new pluggable Django app: django-chronograph

You only have to add one cron entry which acts as a timer, and you have a very nice Django admin interface into the scripts to run.

双马尾 2024-07-20 19:41:09

看看 Django Poor Man 的 Cron,它是一个 Django 应用程序,它利用垃圾邮件机器人、搜索引擎索引机器人等来大约定期运行计划任务

请参阅:http://code.google.com/p/django-poormanscron/

Look at Django Poor Man's Cron which is a Django app that makes use of spambots, search engine indexing robots and alike to run scheduled tasks in approximately regular intervals

See: http://code.google.com/p/django-poormanscron/

山人契 2024-07-20 19:41:09

不久前我有完全相同的要求,最终使用 APScheduler (用户指南

它使调度作业变得非常简单,并使其独立于基于请求的执行一些代码。 下面是一个简单的例子。

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
job = None

def tick():
    print('One tick!')\

def start_job():
    global job
    job = scheduler.add_job(tick, 'interval', seconds=3600)
    try:
        scheduler.start()
    except:
        pass

希望这对某人有帮助!

I had exactly the same requirement a while ago, and ended up solving it using APScheduler (User Guide)

It makes scheduling jobs super simple, and keeps it independent for from request-based execution of some code. Following is a simple example.

from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
job = None

def tick():
    print('One tick!')\

def start_job():
    global job
    job = scheduler.add_job(tick, 'interval', seconds=3600)
    try:
        scheduler.start()
    except:
        pass

Hope this helps somebody!

灯角 2024-07-20 19:41:09

Brian Neal 通过 cron 运行管理命令的建议效果很好,但如果你正在寻找更强大的东西(但不像 Celery 那样复杂),我会考虑像 克罗诺斯

# app/cron.py

import kronos

@kronos.register('0 * * * *')
def task():
    pass

Brian Neal's suggestion of running management commands via cron works well, but if you're looking for something a little more robust (yet not as elaborate as Celery) I'd look into a library like Kronos:

# app/cron.py

import kronos

@kronos.register('0 * * * *')
def task():
    pass
骄傲 2024-07-20 19:41:09

用于调度程序作业的 Django APScheduler。 Advanced Python Scheduler (APScheduler) 是一个 Python 库,可让您安排 Python 代码稍后执行,可以仅执行一次或定期执行。 您可以根据需要随时添加新职位或删除旧职位。

注意:我是这个库的作者

安装APScheduler

pip install apscheduler

查看文件函数调用

文件名:scheduler_jobs.py

def FirstCronTest():
    print("")
    print("I am executed..!")

配置调度程序

makeexecute.py 文件并添加以下代码

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

你编写的函数 这里是调度程序函数编写在scheduler_jobs中

import scheduler_jobs 

scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()

链接文件执行

现在,在Url文件底部添加以下行

import execute

Django APScheduler for Scheduler Jobs. Advanced Python Scheduler (APScheduler) is a Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please.

note: I'm the author of this library

Install APScheduler

pip install apscheduler

View file function to call

file name: scheduler_jobs.py

def FirstCronTest():
    print("")
    print("I am executed..!")

Configuring the scheduler

make execute.py file and add the below codes

from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

Your written functions Here, the scheduler functions are written in scheduler_jobs

import scheduler_jobs 

scheduler.add_job(scheduler_jobs.FirstCronTest, 'interval', seconds=10)
scheduler.start()

Link the File for Execution

Now, add the below line in the bottom of Url file

import execute
梦境 2024-07-20 19:41:09

RabbitMQ 和 Celery 比 Cron 有更多的功能和任务处理能力。 如果任务失败不是问题,并且您认为将在下一次调用中处理损坏的任务,那么 Cron 就足够了。

芹菜和 AMQP 将让您处理损坏的任务,并且它将由另一个工作人员(Celery工作人员)再次执行监听下一个要处理的任务),直到达到任务的 max_retries 属性。 您甚至可以在失败时调用任务,例如记录失败,或在达到 max_retries 后向管理员发送电子邮件。

当您需要扩展应用程序时,您可以分发 Celery 和 AMQP 服务器。

RabbitMQ and Celery have more features and task handling capabilities than Cron. If task failure isn't an issue, and you think you will handle broken tasks in the next call, then Cron is sufficient.

Celery & AMQP will let you handle the broken task, and it will get executed again by another worker (Celery workers listen for the next task to work on), until the task's max_retries attribute is reached. You can even invoke tasks on failure, like logging the failure, or sending an email to the admin once the max_retries has been reached.

And you can distribute Celery and AMQP servers when you need to scale your application.

番薯 2024-07-20 19:41:09

我个人使用 cron,但是 作业调度 部分href="https://github.com/django-extensions/django-extensions" rel="noreferrer">django-extensions 看起来很有趣。

I personally use cron, but the Jobs Scheduling parts of django-extensions looks interesting.

a√萤火虫的光℡ 2024-07-20 19:41:09

尽管不是 Django 的一部分,Airflow 是一个更新的项目(截至 2016 年)对于任务管理很有用。

Airflow 是一个工作流自动化和调度系统,可用于创作和管理数据管道。 基于 Web 的 UI 为开发人员提供了一系列用于管理和查看这些管道的选项。

Airflow 用 Python 编写,并使用 Flask 构建。

Airflow 由 Airbnb 的 Maxime Beauchemin 创建,并于 2015 年春季开源。它于 2016 年冬季加入 Apache 软件基金会的孵化计划。这是 Git 项目页面 和一些附加背景信息

Although not part of Django, Airflow is a more recent project (as of 2016) that is useful for task management.

Airflow is a workflow automation and scheduling system that can be used to author and manage data pipelines. A web-based UI provides the developer with a range of options for managing and viewing these pipelines.

Airflow is written in Python and is built using Flask.

Airflow was created by Maxime Beauchemin at Airbnb and open sourced in the spring of 2015. It joined the Apache Software Foundation’s incubation program in the winter of 2016. Here is the Git project page and some addition background information.

余生共白头 2024-07-20 19:41:09

将以下内容放在 cron.py 文件的顶部:

#!/usr/bin/python
import os, sys
sys.path.append('/path/to/') # the parent directory of the project
sys.path.append('/path/to/project') # these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'

# imports and code below

Put the following at the top of your cron.py file:

#!/usr/bin/python
import os, sys
sys.path.append('/path/to/') # the parent directory of the project
sys.path.append('/path/to/project') # these lines only needed if not on path
os.environ['DJANGO_SETTINGS_MODULE'] = 'myproj.settings'

# imports and code below
最笨的告白 2024-07-20 19:41:09

我只是想到了这个相当简单的解决方案:

  1. 定义一个视图函数 do_work(req, param) 就像使用任何其他视图一样,使用 URL 映射,返回 HttpResponse 等等。
  2. 使用您的计时首选项(或使用 AT 或 Windows 中的计划任务)设置运行 curl http://localhost/your/mapped/url?param=value

您可以添加参数,但只需将参数添加到 URL 中即可。

告诉我你们在想什么。

[更新] 我现在正在使用 django-extensions 而不是卷曲。

我的 cron 看起来像这样:

@hourly python /path/to/project/manage.py runjobs hourly

...每天、每月等等。 您还可以将其设置为运行特定作业。

我发现它更易于管理且更清洁。 不需要将 URL 映射到视图。 只需定义您的工作类别和 crontab 即可。

I just thought about this rather simple solution:

  1. Define a view function do_work(req, param) like you would with any other view, with URL mapping, return a HttpResponse and so on.
  2. Set up a cron job with your timing preferences (or using AT or Scheduled Tasks in Windows) which runs curl http://localhost/your/mapped/url?param=value.

You can add parameters but just adding parameters to the URL.

Tell me what you guys think.

[Update] I'm now using runjob command from django-extensions instead of curl.

My cron looks something like this:

@hourly python /path/to/project/manage.py runjobs hourly

... and so on for daily, monthly, etc'. You can also set it up to run a specific job.

I find it more managable and a cleaner. Doesn't require mapping a URL to a view. Just define your job class and crontab and you're set.

寄离 2024-07-20 19:41:09

在代码部分之后,我可以写任何东西,就像我的views.py

#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################

:)
http://www.cotellese.net/ 2007/09/27/running-external-scripts-against-django-models/

after the part of code,I can write anything just like my views.py :)

#######################################
import os,sys
sys.path.append('/home/administrator/development/store')
os.environ['DJANGO_SETTINGS_MODULE']='store.settings'
from django.core.management impor setup_environ
from store import settings
setup_environ(settings)
#######################################

from
http://www.cotellese.net/2007/09/27/running-external-scripts-against-django-models/

开始看清了 2024-07-20 19:41:09

你绝对应该看看 django-q!
它不需要额外的配置,并且很可能拥有处理商业项目的任何生产问题所需的一切。

它正在积极开发,并与 django、django ORM、mongo、redis 集成得很好。 这是我的配置:

# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
    # Match recommended settings from docs.
    'name': 'DjangoORM',
    'workers': 4,
    'queue_limit': 50,
    'bulk': 10,
    'orm': 'default',

# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,

# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,

# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,

# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,

# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,

# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
    'sentry': RAVEN_CONFIG,
},
}

You should definitely check out django-q!
It requires no additional configuration and has quite possibly everything needed to handle any production issues on commercial projects.

It's actively developed and integrates very well with django, django ORM, mongo, redis. Here is my configuration:

# django-q
# -------------------------------------------------------------------------
# See: http://django-q.readthedocs.io/en/latest/configure.html
Q_CLUSTER = {
    # Match recommended settings from docs.
    'name': 'DjangoORM',
    'workers': 4,
    'queue_limit': 50,
    'bulk': 10,
    'orm': 'default',

# Custom Settings
# ---------------
# Limit the amount of successful tasks saved to Django.
'save_limit': 10000,

# See https://github.com/Koed00/django-q/issues/110.
'catch_up': False,

# Number of seconds a worker can spend on a task before it's terminated.
'timeout': 60 * 5,

# Number of seconds a broker will wait for a cluster to finish a task before presenting it again. This needs to be
# longer than `timeout`, otherwise the same task will be processed multiple times.
'retry': 60 * 6,

# Whether to force all async() calls to be run with sync=True (making them synchronous).
'sync': False,

# Redirect worker exceptions directly to Sentry error reporter.
'error_reporter': {
    'sentry': RAVEN_CONFIG,
},
}
温折酒 2024-07-20 19:41:09

是的,上面的方法太棒了。 我尝试了其中一些。 最后,我找到了这样的方法:

    from threading import Timer

    def sync():

        do something...

        sync_timer = Timer(self.interval, sync, ())
        sync_timer.start()

就像递归一样。

好的,希望这个方法能够满足您的要求。 :)

Yes, the method above is so great. And I tried some of them. At last, I found a method like this:

    from threading import Timer

    def sync():

        do something...

        sync_timer = Timer(self.interval, sync, ())
        sync_timer.start()

Just like Recursive.

Ok, I hope this method can meet your requirement. :)

錯遇了你 2024-07-20 19:41:09

一个更现代的解决方案(与 Celery 相比)是 Django Q:
https://django-q.readthedocs.io/en/latest/index.html html

它有很棒的文档并且很容易理解。 缺乏 Windows 支持,因为 Windows 不支持进程分叉。 但如果您使用 Windows for Linux 子系统创建开发环境,它就可以正常工作。

A more modern solution (compared to Celery) is Django Q:
https://django-q.readthedocs.io/en/latest/index.html

It has great documentation and is easy to grok. Windows support is lacking, because Windows does not support process forking. But it works fine if you create your dev environment using the Windows for Linux Subsystem.

笑饮青盏花 2024-07-20 19:41:09

我今天也遇到了和你类似的问题。

我不想让服务器通过 cron 来处理它(并且大多数库最终只是 cron 助手)。

所以我创建了一个调度模块并将其附加到 init

这不是最好的方法,但它帮助我将所有代码放在一个位置,并且其执行与主应用程序相关。

I had something similar with your problem today.

I didn't wanted to have it handled by the server trhough cron (and most of the libs were just cron helpers in the end).

So i've created a scheduling module and attached it to the init .

It's not the best approach, but it helps me to have all the code in a single place and with its execution related to the main app.

故事和酒 2024-07-20 19:41:09

我使用 celery 来创建我的定期任务。 首先,您需要按如下方式安装它:

pip install django-celery

不要忘记在您的设置中注册django-celery,然后您可以执行以下操作:

from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
 #your code

I use celery to create my periodical tasks. First you need to install it as follows:

pip install django-celery

Don't forget to register django-celery in your settings and then you could do something like this:

from celery import task
from celery.decorators import periodic_task
from celery.task.schedules import crontab
from celery.utils.log import get_task_logger
@periodic_task(run_every=crontab(minute="0", hour="23"))
def do_every_midnight():
 #your code
意犹 2024-07-20 19:41:09

我不确定这对任何人都有用,因为我必须提供系统的其他用户来安排作业,而不让他们访问实际的服务器(Windows)任务计划程序,我创建了这个可重用的应用程序。

请注意,用户有权访问服务器上的一个共享文件夹,他们可以在其中创建所需的命令/任务/.bat 文件。 然后可以使用此应用程序安排此任务。

应用程序名称是 Django_Windows_Scheduler

屏幕截图:
输入图片此处描述

I am not sure will this be useful for anyone, since I had to provide other users of the system to schedule the jobs, without giving them access to the actual server(windows) Task Scheduler, I created this reusable app.

Please note users have access to one shared folder on server where they can create required command/task/.bat file. This task then can be scheduled using this app.

App name is Django_Windows_Scheduler

ScreenShot:
enter image description here

只是在用心讲痛 2024-07-20 19:41:09

如果您想要比 Celery 更可靠的东西,请尝试构建在AWS SQS/SNS之上的TaskHawk

请参阅:http://taskhawk.readthedocs.io

If you want something more reliable than Celery, try TaskHawk which is built on top of AWS SQS/SNS.

Refer: http://taskhawk.readthedocs.io

孤寂小茶 2024-07-20 19:41:09

对于简单的码头化项目,我无法真正看到任何现有的答案适合。

因此,我编写了一个非常简单的解决方案,无需外部库或触发器,它可以自行运行。 不需要外部 os-cron,应该可以在每个环境中工作。

它的工作原理是添加一个中间件: middleware.py

import threading

def should_run(name, seconds_interval):
    from application.models import CronJob
    from django.utils.timezone import now

    try:
        c = CronJob.objects.get(name=name)
    except CronJob.DoesNotExist:
        CronJob(name=name, last_ran=now()).save()
        return True

    if (now() - c.last_ran).total_seconds() >= seconds_interval:
        c.last_ran = now()
        c.save()
        return True

    return False


class CronTask:
    def __init__(self, name, seconds_interval, function):
        self.name = name
        self.seconds_interval = seconds_interval
        self.function = function


def cron_worker(*_):
    if not should_run("main", 60):
        return

    # customize this part:
    from application.models import Event
    tasks = [
        CronTask("events", 60 * 30, Event.clean_stale_objects),
        # ...
    ]

    for task in tasks:
        if should_run(task.name, task.seconds_interval):
            task.function()


def cron_middleware(get_response):

    def middleware(request):
        response = get_response(request)
        threading.Thread(target=cron_worker).start()
        return response

    return middleware

models/cron.py:

from django.db import models


class CronJob(models.Model):
    name = models.CharField(max_length=10, primary_key=True)
    last_ran = models.DateTimeField()

settings.py:

MIDDLEWARE = [
    ...
    'application.middleware.cron_middleware',
    ...
]

For simple dockerized projects, I could not really see any existing answer fit.

So I wrote a very barebones solution without the need of external libraries or triggers, which runs on its own. No external os-cron needed, should work in every environment.

It works by adding a middleware: middleware.py

import threading

def should_run(name, seconds_interval):
    from application.models import CronJob
    from django.utils.timezone import now

    try:
        c = CronJob.objects.get(name=name)
    except CronJob.DoesNotExist:
        CronJob(name=name, last_ran=now()).save()
        return True

    if (now() - c.last_ran).total_seconds() >= seconds_interval:
        c.last_ran = now()
        c.save()
        return True

    return False


class CronTask:
    def __init__(self, name, seconds_interval, function):
        self.name = name
        self.seconds_interval = seconds_interval
        self.function = function


def cron_worker(*_):
    if not should_run("main", 60):
        return

    # customize this part:
    from application.models import Event
    tasks = [
        CronTask("events", 60 * 30, Event.clean_stale_objects),
        # ...
    ]

    for task in tasks:
        if should_run(task.name, task.seconds_interval):
            task.function()


def cron_middleware(get_response):

    def middleware(request):
        response = get_response(request)
        threading.Thread(target=cron_worker).start()
        return response

    return middleware

models/cron.py:

from django.db import models


class CronJob(models.Model):
    name = models.CharField(max_length=10, primary_key=True)
    last_ran = models.DateTimeField()

settings.py:

MIDDLEWARE = [
    ...
    'application.middleware.cron_middleware',
    ...
]
梦在夏天 2024-07-20 19:41:09

简单的方法是编写自定义 shell 命令,请参阅 Django 文档 并在 Linux 上使用 cronjob 执行它。 不过,我强烈建议使用像 RabbitMQ 这样的消息代理和 celery。 也许你可以看看
这个教程

Simple way is to write a custom shell command see Django Documentation and execute it using a cronjob on linux. However i would highly recommend using a message broker like RabbitMQ coupled with celery. Maybe you can have a look at
this Tutorial

一个人的旅程 2024-07-20 19:41:09

一种替代方法是使用 Rocketry

from rocketry import Rocketry
from rocketry.conds import daily, after_success

app = Rocketry()

@app.task(daily.at("10:00"))
def do_daily():
    ...

@app.task(after_success(do_daily))
def do_after_another():
    ...

if __name__ == "__main__":
    app.run()

它还支持自定义条件:

from pathlib import Path

@app.cond()
def file_exists(file):
    return Path(file).exists()

@app.task(daily & file_exists("myfile.csv"))
def do_custom():
    ...

并且它还支持 Cron:

from rocketry.conds import cron

@app.task(cron('*/2 12-18 * Oct Fri'))
def do_cron():
    ...

它可以与 FastAPI 很好地集成,我认为它可以与 Django 集成,Rocketry 本质上也是如此只是一个可以生成异步任务、线程和进程的复杂循环。

免责声明:我是作者。

One alternative is to use Rocketry:

from rocketry import Rocketry
from rocketry.conds import daily, after_success

app = Rocketry()

@app.task(daily.at("10:00"))
def do_daily():
    ...

@app.task(after_success(do_daily))
def do_after_another():
    ...

if __name__ == "__main__":
    app.run()

It also supports custom conditions:

from pathlib import Path

@app.cond()
def file_exists(file):
    return Path(file).exists()

@app.task(daily & file_exists("myfile.csv"))
def do_custom():
    ...

And it also supports Cron:

from rocketry.conds import cron

@app.task(cron('*/2 12-18 * Oct Fri'))
def do_cron():
    ...

It can be integrated quite nicely with FastAPI and I think it could be integrated with Django as well as Rocketry is essentially just a sophisticated loop that can spawn, async tasks, threads and processes.

Disclaimer: I'm the author.

酷到爆炸 2024-07-20 19:41:09

另一种选择,类似于 Brian Neal 的回答,使用 RunScripts

然后你不需要设置命令。 这样做的优点是文件夹结构更灵活或更清晰。

该文件必须实现 run() 函数。 这是运行脚本时所调用的。 您可以导入 django 项目的任何模型或其他部分以在这些脚本中使用。

然后,就

python manage.py runscript path.to.script

Another option, similar to Brian Neal's answer it to use RunScripts

Then you don't need to set up commands. This has the advantage of more flexible or cleaner folder structures.

This file must implement a run() function. This is what gets called when you run the script. You can import any models or other parts of your django project to use in these scripts.

And then, just

python manage.py runscript path.to.script
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文