从 APScheduler 后台线程发送信号到 Flask 应用程序
我正在尝试构建一个应用程序,让您可以使用 APScheduler。为了控制作业计划并查看作业的(实时)输出,我想将消息发送到在同一进程中运行的 Flask 应用程序(使用 Blinker),以便我可以使用 Flask-SocketIO。
我想出了以下代码,但似乎根本没有调用 send_log_update()
。请注意,我尚未将 Flask-SocketIO 添加到此示例中。我首先想确保在事情进一步复杂化之前我可以与 Flask 应用程序进行通信。
这是一种明智的处理方式吗?如果是这样:我在这里做错了什么吗?我并没有专门使用任何已使用的解决方案,但我确实需要像 APScheduler 这样的东西来在特定时间安排作业(而不仅仅是间隔,如本例所示)。
我考虑过也使用 websocket 来提供后台作业和应用程序其余部分之间的通信的可能性,但这太不可靠了。除了将其流式传输到 Web 客户端之外,我还必须处理来自后台进程的所有输出(发送到日志摄取器),并且我希望后台作业尽可能与任何数据库和日志记录框架无关。
# pip install flask apscheduler sqlalchemy blinker
from time import sleep
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from pytz import utc
# initialize Flask+SocketIO
app = Flask(__name__)
# signal to communicate between background thread and Flask
logsignal = signal('log')
# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect_via(app)
def send_log_update(sender, log_line, context, **extra):
# eventually I want to send this to the web client using
# Flask-SocketIO
print('received signal: ' + log_line)
# Background job that will run in the scheduler thread
def background_job():
print('starting background job')
logsignal.send('starting job')
sleep(3)
logsignal.send('job done')
# configure APScheduler
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
'coalesce': False,
'max_instances': 1
}
# create and start scheduler
scheduler = BackgroundScheduler(
job_defaults=job_defaults, jobstores=jobstores, timezone=utc)
if __name__ == '__main__':
scheduler.add_job(background_job, 'interval', seconds=5,
replace_existing=True, id='sample_job',
args=[])
scheduler.start()
app.run()
I am trying to build an application that lets you schedule and execute multiple long running jobs in a background thread using APScheduler. To control the job schedules and view the (live) output of the jobs I want to send messages to the Flask application that runs in the same process (using Blinker) so I can stream them to a web client using Flask-SocketIO.
I came up with the following code but it seems send_log_update()
is not being called at all. Please note that I have not yet added Flask-SocketIO to this example. I first wanted to make sure I could communicate to the Flask application before further complicating things.
Is this a sensible way to go about things? And if so: Am I doing something wrong here? I am not married to any of the used solutions specifically but I do need something like APScheduler to schedule jobs at specific times (instead of just intervals, like in this example).
I have considered the possibility of also using websockets to provide the communication between the background job and the rest of the application but that would be too unreliable. I have to process all output coming from the background process (to send to a log ingester) in addition to streaming it to a web client and I would like to keep the background job as agnostic of any databases and logging frameworks as possible.
# pip install flask apscheduler sqlalchemy blinker
from time import sleep
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from pytz import utc
# initialize Flask+SocketIO
app = Flask(__name__)
# signal to communicate between background thread and Flask
logsignal = signal('log')
# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect_via(app)
def send_log_update(sender, log_line, context, **extra):
# eventually I want to send this to the web client using
# Flask-SocketIO
print('received signal: ' + log_line)
# Background job that will run in the scheduler thread
def background_job():
print('starting background job')
logsignal.send('starting job')
sleep(3)
logsignal.send('job done')
# configure APScheduler
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
'coalesce': False,
'max_instances': 1
}
# create and start scheduler
scheduler = BackgroundScheduler(
job_defaults=job_defaults, jobstores=jobstores, timezone=utc)
if __name__ == '__main__':
scheduler.add_job(background_job, 'interval', seconds=5,
replace_existing=True, id='sample_job',
args=[])
scheduler.start()
app.run()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
答案很简单,我使用
@logsignal.connect_via(app)
来限制send_log_update()
处理程序仅响应来自 Flaskapp 的信号
。使用常规的@logsignal.connect方法后,处理程序被执行。我制作了一个完整的工作示例,其中包含一个显示正在流式传输的日志的 Web 界面。The answer was quite simple, I was using
@logsignal.connect_via(app)
which restricts thesend_log_update()
handler to only respond to signals originating from the Flaskapp
. After using the regular@logsignal.connect
method the handler got executed. I made into a fully working example with a web interface that shows the log being streamed.