如何使用 apscheduler 为机器人安排异步函数

发布于 2025-01-10 02:58:53 字数 1045 浏览 1 评论 0原文

我想使用 apscheduler 安排一个异步功能,就像

我向机器人添加作业一样,并且不知何故我在重新启动机器人后停止运行机器人,它应该继续该功能。

就像我每 3 小时安排一次异步函数,我的机器人在中间停止,然后我重新启动机器人(重新启动时我们将正常执行 schedular.start())。我想让异步函数再次继续,而无需再次添加。

我尝试了这个,但它不起作用

from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from pytz import utc

import datetime

jobstores = {
    'default': SQLAlchemyJobStore(url=database_url),
}

executors = {
    'default': AsyncIOExecutor(),
}

job_defaults = {
    'coalesce': False,
    'max_instances': 1
} 

scheduler = AsyncIOScheduler(jobstores=jobstores,
                             executors=executors,
                             job_defaults=job_defaults,
                             timezone=utc)

async def myfunc():
    print("result")

trigger_time = datetime.datetime.utcnow() + datetime.timedelta(hours=3)
scheduler.add_job(myfunc, 'interval',id=55454,run_date=trigger_time)
scheduler.start()
 

I want to schedule a async function a using apscheduler like

if i add job to the bot and somehow i stopped running a bot after restart of the bot it should continue the function .

Like i schedules a async funtion every 3 hours and my bot stopped in middle and then i restarted bot(while restarting we will do normaly schedular.start()). I want to make it that async function to continue again without adding again .

i tried this one but it is not working

from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from pytz import utc

import datetime

jobstores = {
    'default': SQLAlchemyJobStore(url=database_url),
}

executors = {
    'default': AsyncIOExecutor(),
}

job_defaults = {
    'coalesce': False,
    'max_instances': 1
} 

scheduler = AsyncIOScheduler(jobstores=jobstores,
                             executors=executors,
                             job_defaults=job_defaults,
                             timezone=utc)

async def myfunc():
    print("result")

trigger_time = datetime.datetime.utcnow() + datetime.timedelta(hours=3)
scheduler.add_job(myfunc, 'interval',id=55454,run_date=trigger_time)
scheduler.start()
 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

南冥有猫 2025-01-17 02:58:53

认为你所拥有的基本上是正确的,除了你如何安排它之外。您正在为作业的间隔提供日期时间。尝试类似的方法,

scheduler.add_job(myfunc, 'interval', hours=3)

请参阅此处以了解有关该方法的更多信息: https:// /apscheduler.readthedocs.io/en/3.x/modules/triggers/interval.html

还有 'calendarinterval' 如果您需要,它可能会很有用使用日期: https://apscheduler.readthedocs.io/en/ master/modules/triggers/calendarinterval.html

最后,如果 id=55454 是函数的参数,则需要以列表形式提供,例如
scheduler.add_job(myfunc, 'interval', hours=3, args=[55454])
https://apscheduler.readthedocs.io/en/latest/modules/schedulers/base.html#apscheduler.schedulers.base.BaseScheduler.add_job

Think what you have is broadly correct except for how you are scheduling it. You are supplying a datetime to the job's interval. Try something like

scheduler.add_job(myfunc, 'interval', hours=3)

See here for more on that method: https://apscheduler.readthedocs.io/en/3.x/modules/triggers/interval.html

There's also 'calendarinterval' which may be useful if you need to use dates: https://apscheduler.readthedocs.io/en/master/modules/triggers/calendarinterval.html

Lastly, if id=55454 is meant to be an argument for your function, it needs to be supplied as a list like
scheduler.add_job(myfunc, 'interval', hours=3, args=[55454])
https://apscheduler.readthedocs.io/en/latest/modules/schedulers/base.html#apscheduler.schedulers.base.BaseScheduler.add_job

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文