从 APScheduler 后台线程发送信号到 Flask 应用程序

发布于 2025-01-13 14:30:07 字数 2212 浏览 0 评论 0原文

我正在尝试构建一个应用程序,让您可以使用 APScheduler。为了控制作业计划并查看作业的(实时)输出,我想将消息发送到在同一进程中运行的 Flask 应用程序(使用 Blinker),以便我可以使用 Flask-SocketIO

我想出了以下代码,但似乎根本没有调用 send_log_update() 。请注意,我尚未将 Flask-SocketIO 添加到此示例中。我首先想确保在事情进一步复杂化之前我可以与 Flask 应用程序进行通信。

这是一种明智的处理方式吗?如果是这样:我在这里做错了什么吗?我并没有专门使用任何已使用的解决方案,但我确实需要像 APScheduler 这样的东西来在特定时间安排作业(而不仅仅是间隔,如本例所示)。

我考虑过也使用 websocket 来提供后台作业和应用程序其余部分之间的通信的可能性,但这太不可靠了。除了将其流式传输到 Web 客户端之外,我还必须处理来自后台进程的所有输出(发送到日志摄取器),并且我希望后台作业尽可能与任何数据库和日志记录框架无关。

# pip install flask apscheduler sqlalchemy blinker

from time import sleep

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from pytz import utc

# initialize Flask+SocketIO
app = Flask(__name__)

# signal to communicate between background thread and Flask
logsignal = signal('log')


# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect_via(app)
def send_log_update(sender, log_line, context, **extra):
    # eventually I want to send this to the web client using
    # Flask-SocketIO
    print('received signal: ' + log_line)


# Background job that will run in the scheduler thread
def background_job():
    print('starting background job')
    logsignal.send('starting job')
    sleep(3)
    logsignal.send('job done')


# configure APScheduler
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
    'coalesce': False,
    'max_instances': 1
}

# create and start scheduler
scheduler = BackgroundScheduler(
    job_defaults=job_defaults, jobstores=jobstores, timezone=utc)


if __name__ == '__main__':
    scheduler.add_job(background_job, 'interval', seconds=5,
                      replace_existing=True, id='sample_job',
                      args=[])

    scheduler.start()
    app.run()

I am trying to build an application that lets you schedule and execute multiple long running jobs in a background thread using APScheduler. To control the job schedules and view the (live) output of the jobs I want to send messages to the Flask application that runs in the same process (using Blinker) so I can stream them to a web client using Flask-SocketIO.

I came up with the following code but it seems send_log_update() is not being called at all. Please note that I have not yet added Flask-SocketIO to this example. I first wanted to make sure I could communicate to the Flask application before further complicating things.

Is this a sensible way to go about things? And if so: Am I doing something wrong here? I am not married to any of the used solutions specifically but I do need something like APScheduler to schedule jobs at specific times (instead of just intervals, like in this example).

I have considered the possibility of also using websockets to provide the communication between the background job and the rest of the application but that would be too unreliable. I have to process all output coming from the background process (to send to a log ingester) in addition to streaming it to a web client and I would like to keep the background job as agnostic of any databases and logging frameworks as possible.

# pip install flask apscheduler sqlalchemy blinker

from time import sleep

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from pytz import utc

# initialize Flask+SocketIO
app = Flask(__name__)

# signal to communicate between background thread and Flask
logsignal = signal('log')


# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect_via(app)
def send_log_update(sender, log_line, context, **extra):
    # eventually I want to send this to the web client using
    # Flask-SocketIO
    print('received signal: ' + log_line)


# Background job that will run in the scheduler thread
def background_job():
    print('starting background job')
    logsignal.send('starting job')
    sleep(3)
    logsignal.send('job done')


# configure APScheduler
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
    'coalesce': False,
    'max_instances': 1
}

# create and start scheduler
scheduler = BackgroundScheduler(
    job_defaults=job_defaults, jobstores=jobstores, timezone=utc)


if __name__ == '__main__':
    scheduler.add_job(background_job, 'interval', seconds=5,
                      replace_existing=True, id='sample_job',
                      args=[])

    scheduler.start()
    app.run()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

酒儿 2025-01-20 14:30:07

答案很简单,我使用 @logsignal.connect_via(app) 来限制 send_log_update() 处理程序仅响应来自 Flask app 的信号。使用常规的@logsignal.connect方法后,处理程序被执行。我制作了一个完整的工作示例,其中包含一个显示正在流式传输的日志的 Web 界面。

# Runs a scheduled job in a background thread using APScheduler and streams
# it's output to a web client using websockets. Communication between the Flask
# thread and APScheduler thread is being done through (blinker) signals.
#
# Install dependencies (preferably in your virtualenv)
#   pip install flask apscheduler sqlalchemy blinker flask-socketio simple-websocket
# and then run with:
#   python this_script.py

from time import sleep

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from flask_socketio import SocketIO
from pytz import utc

# initialize Flask+SocketIO
app = Flask(__name__)
socketio = SocketIO(app)

# signal to communicate between background thread and Flask
logsignal = signal('log')


# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect
def send_log_update(log_line):
    socketio.emit('logUpdate', log_line)


# Background job that will run in the scheduler thread
def background_job():
    logsignal.send('starting job')
    sleep(3)
    logsignal.send('job done')


# configure APScheduler
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
    'coalesce': False,
    'max_instances': 1
}

# create and start scheduler
scheduler = BackgroundScheduler(
    job_defaults=job_defaults, jobstores=jobstores, timezone=utc)


# simple websocket client for testing purposes
@app.route("/")
def info():
    return """
    <html>
    <head>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js" integrity="sha512-q/dWJ3kcmjBLU4Qc47E4A9kTB4m3wuTY7vkFJDTZKjTs8jhyGQnaUrxa0Ytd0ssMZhbNua9hE+E7Qv1j+DyZwA==" crossorigin="anonymous"></script>
    </head>
    <body>
    <h1>Streaming log</h1>
    <pre id="log"></pre>
    <script type="text/javascript" charset="utf-8">
        var socket = io();

        socket.on('logUpdate', function(msg) {
            let log = document.getElementById('log');
            log.append(msg + '\\n');
        });
    </script>
    </body>
    </html>
"""


if __name__ == '__main__':
    scheduler.add_job(background_job, 'interval', seconds=5,
                      replace_existing=True, id='sample_job',
                      args=[])

    scheduler.start()
    socketio.run(app)

The answer was quite simple, I was using @logsignal.connect_via(app) which restricts the send_log_update() handler to only respond to signals originating from the Flask app. After using the regular @logsignal.connect method the handler got executed. I made into a fully working example with a web interface that shows the log being streamed.

# Runs a scheduled job in a background thread using APScheduler and streams
# it's output to a web client using websockets. Communication between the Flask
# thread and APScheduler thread is being done through (blinker) signals.
#
# Install dependencies (preferably in your virtualenv)
#   pip install flask apscheduler sqlalchemy blinker flask-socketio simple-websocket
# and then run with:
#   python this_script.py

from time import sleep

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from blinker import signal
from flask import Flask
from flask_socketio import SocketIO
from pytz import utc

# initialize Flask+SocketIO
app = Flask(__name__)
socketio = SocketIO(app)

# signal to communicate between background thread and Flask
logsignal = signal('log')


# handle signals coming from background thread and emit them
# over the websocket
@logsignal.connect
def send_log_update(log_line):
    socketio.emit('logUpdate', log_line)


# Background job that will run in the scheduler thread
def background_job():
    logsignal.send('starting job')
    sleep(3)
    logsignal.send('job done')


# configure APScheduler
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///scheduler.sqlite')
}
job_defaults = {
    'coalesce': False,
    'max_instances': 1
}

# create and start scheduler
scheduler = BackgroundScheduler(
    job_defaults=job_defaults, jobstores=jobstores, timezone=utc)


# simple websocket client for testing purposes
@app.route("/")
def info():
    return """
    <html>
    <head>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js" integrity="sha512-q/dWJ3kcmjBLU4Qc47E4A9kTB4m3wuTY7vkFJDTZKjTs8jhyGQnaUrxa0Ytd0ssMZhbNua9hE+E7Qv1j+DyZwA==" crossorigin="anonymous"></script>
    </head>
    <body>
    <h1>Streaming log</h1>
    <pre id="log"></pre>
    <script type="text/javascript" charset="utf-8">
        var socket = io();

        socket.on('logUpdate', function(msg) {
            let log = document.getElementById('log');
            log.append(msg + '\\n');
        });
    </script>
    </body>
    </html>
"""


if __name__ == '__main__':
    scheduler.add_job(background_job, 'interval', seconds=5,
                      replace_existing=True, id='sample_job',
                      args=[])

    scheduler.start()
    socketio.run(app)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文