如何停止 Tornado 网络服务器?

发布于 2024-10-25 04:38:39 字数 2357 浏览 5 评论 0原文

我一直在玩 Tornado Web 服务器,并且已经到了我想停止 Web 服务器的地步(因为单元测试期间的示例)。以下简单示例Tornado 网页上存在

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([
    (r"/", MainHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

一次 tornado.ioloop.IOLoop.instance().start () 被调用,它会阻塞程序(或当前线程)。阅读 IOLoop 对象的源代码给出stop 函数文档中的这个示例:

To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this:
  ioloop = IOLoop()
  async_method(ioloop=ioloop, callback=ioloop.stop)
  ioloop.start()
ioloop.start() will return after async_method has run its callback,
whether that callback was invoked before or after ioloop.start.

但是,我不知道如何将其集成到我的程序中。我实际上有一个封装 Web 服务器的类(有自己的 startstop 函数),但是一旦我调用 start,程序(或测试)就会无论如何当然会阻塞。

我尝试在另一个进程中启动 Web 服务器(使用 multiprocessing 包)。这是包装 Web 服务器的类:

class Server:
    def __init__(self, port=8888):
        self.application = tornado.web.Application([ (r"/", Handler) ])

        def server_thread(application, port):
            http_server = tornado.httpserver.HTTPServer(application)
            http_server.listen(port)
            tornado.ioloop.IOLoop.instance().start()

        self.process = Process(target=server_thread,
                               args=(self.application, port,))

    def start(self):
        self.process.start()

    def stop(self):
        ioloop = tornado.ioloop.IOLoop.instance()
        ioloop.add_callback(ioloop.stop)

但是,stop 似乎并没有完全停止 Web 服务器,因为即使使用此测试设置,它仍在下一个测试中运行:

def setup_method(self, _function):
    self.server = Server()
    self.server.start()
    time.sleep(0.5)  # Wait for web server to start

def teardown_method(self, _function):
    self.kstore.stop()
    time.sleep(0.5)

How can I start and stop a Tornado Web server from在 Python 程序中?

I've been playing around a bit with the Tornado web server and have come to a point where I want to stop the web server (for example during unit testing). The following simple example exists on the Tornado web page:

import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([
    (r"/", MainHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Once tornado.ioloop.IOLoop.instance().start() is called, it blocks the program (or current thread). Reading the source code for the IOLoop object gives this example in the documentation for the stop function:

To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this:
  ioloop = IOLoop()
  async_method(ioloop=ioloop, callback=ioloop.stop)
  ioloop.start()
ioloop.start() will return after async_method has run its callback,
whether that callback was invoked before or after ioloop.start.

However, I have no idea how to integrate this into my program. I actually have a class that encapsulates the web server (having it's own start and stop functions), but as soon as I call start, the program (or tests) will of course block anyway.

I've tried to start the web server in another process (using the multiprocessing package). This is the class that is wrapping the web server:

class Server:
    def __init__(self, port=8888):
        self.application = tornado.web.Application([ (r"/", Handler) ])

        def server_thread(application, port):
            http_server = tornado.httpserver.HTTPServer(application)
            http_server.listen(port)
            tornado.ioloop.IOLoop.instance().start()

        self.process = Process(target=server_thread,
                               args=(self.application, port,))

    def start(self):
        self.process.start()

    def stop(self):
        ioloop = tornado.ioloop.IOLoop.instance()
        ioloop.add_callback(ioloop.stop)

However, stop does not seem to entirely stop the web server since it is still running in the next test, even with this test setup:

def setup_method(self, _function):
    self.server = Server()
    self.server.start()
    time.sleep(0.5)  # Wait for web server to start

def teardown_method(self, _function):
    self.kstore.stop()
    time.sleep(0.5)

How can I start and stop a Tornado web server from within a Python program?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

墨小沫ゞ 2024-11-01 04:38:47

只需在 start() 之前添加以下内容:

IOLoop.instance().add_timeout(10,IOLoop.instance().stop)

它将在循环中将停止函数注册为回调,并在开始后 10 秒启动它

Just add this before the start():

IOLoop.instance().add_timeout(10,IOLoop.instance().stop)

It will register the stop function as a callback in the loop and lauch it 10 second after the start

音栖息无 2024-11-01 04:38:46

在 multiprocessing.Process 下运行时,Tornado 的 IOloop.instance() 无法从外部信号停止。

我想出的唯一始终有效的解决方案是使用 Process.terminate():

import tornado.ioloop, tornado.web
import time
import multiprocessing

class Handler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([ (r"/", Handler) ])

class TornadoStop(Exception):
    pass
def stop():
    raise TornadoStop
class worker(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        application.listen(8888)
        self.ioloop = tornado.ioloop.IOLoop.instance()

    def run(self):
        self.ioloop.start()

    def stop(self, timeout = 0):
        self.ioloop.stop()
        time.sleep(timeout)
        self.terminate()



if __name__ == "__main__":

    w = worker()
    print 'starting server'
    w.start()
    t = 2
    print 'waiting {} seconds before stopping'.format(t)
    for i in range(t):
        time.sleep(1)
        print i
    print 'stopping'
    w.stop(1)
    print 'stopped'

Tornado's IOloop.instance() has trouble stopping from an external signal when run under multiprocessing.Process.

The only solution I came up with that works consistently, is by using Process.terminate():

import tornado.ioloop, tornado.web
import time
import multiprocessing

class Handler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([ (r"/", Handler) ])

class TornadoStop(Exception):
    pass
def stop():
    raise TornadoStop
class worker(multiprocessing.Process):
    def __init__(self):
        multiprocessing.Process.__init__(self)
        application.listen(8888)
        self.ioloop = tornado.ioloop.IOLoop.instance()

    def run(self):
        self.ioloop.start()

    def stop(self, timeout = 0):
        self.ioloop.stop()
        time.sleep(timeout)
        self.terminate()



if __name__ == "__main__":

    w = worker()
    print 'starting server'
    w.start()
    t = 2
    print 'waiting {} seconds before stopping'.format(t)
    for i in range(t):
        time.sleep(1)
        print i
    print 'stopping'
    w.stop(1)
    print 'stopped'
甜警司 2024-11-01 04:38:46

我们希望结合使用 multiprocessing.Processtornado.ioloop.IOLoop 来解决 cPython GIL 的问题,以提高性能和独立性。为了访问 IOLoop,我们需要使用队列来传递关闭信号。

这是一个简单的例子:

class Server(BokehServer)

    def start(self, signal=None):
        logger.info('Starting server on http://localhost:%d'
                    % (self.port))

        if signal is not None:
            def shutdown():
                if not signal.empty():
                    self.stop()
            tornado.ioloop.PeriodicCallback(shutdown, 1000).start()

        BokehServer.start(self)
        self.ioloop.start()

    def stop(self, *args, **kwargs):  # args important for signals
        logger.info('Stopping server...')
        BokehServer.stop(self)
        self.ioloop.stop()

流程

import multiprocessing as mp
import signal

from server import Server  # noqa

class ServerProcess(mp.Process):
    def __init__(self, *args, **kwargs):
        self.server = Server(*args, **kwargs)
        self.shutdown_signal = _mp.Queue(1)
        mp.Process.__init__(self)

        signal.signal(signal.SIGTERM, self.server.stop)
        signal.signal(signal.SIGINT, self.server.stop)

    def run(self):
        self.server.start(signal=self.shutdown_signal)

    def stop(self):
        self.shutdown_signal.put(True)

if __name__ == '__main__':
    p = ServerProcess()
    p.start()

干杯!

We want to use a multiprocessing.Process with a tornado.ioloop.IOLoop to work around the cPython GIL for performance and independency. To get access to the IOLoop we need to use Queue to pass the shutdown signal through.

Here is a minimalistic example:

class Server(BokehServer)

    def start(self, signal=None):
        logger.info('Starting server on http://localhost:%d'
                    % (self.port))

        if signal is not None:
            def shutdown():
                if not signal.empty():
                    self.stop()
            tornado.ioloop.PeriodicCallback(shutdown, 1000).start()

        BokehServer.start(self)
        self.ioloop.start()

    def stop(self, *args, **kwargs):  # args important for signals
        logger.info('Stopping server...')
        BokehServer.stop(self)
        self.ioloop.stop()

The Process

import multiprocessing as mp
import signal

from server import Server  # noqa

class ServerProcess(mp.Process):
    def __init__(self, *args, **kwargs):
        self.server = Server(*args, **kwargs)
        self.shutdown_signal = _mp.Queue(1)
        mp.Process.__init__(self)

        signal.signal(signal.SIGTERM, self.server.stop)
        signal.signal(signal.SIGINT, self.server.stop)

    def run(self):
        self.server.start(signal=self.shutdown_signal)

    def stop(self):
        self.shutdown_signal.put(True)

if __name__ == '__main__':
    p = ServerProcess()
    p.start()

Cheers!

鱼窥荷 2024-11-01 04:38:45

如果您不想打扰线程,您可以捕获键盘中断信号:

try:
    tornado.ioloop.IOLoop.instance().start()
# signal : CTRL + BREAK on windows or CTRL + C on linux
except KeyboardInterrupt:
    tornado.ioloop.IOLoop.instance().stop()

In case you do no want to bother with threads, you could catch a keyboard interrupt signal :

try:
    tornado.ioloop.IOLoop.instance().start()
# signal : CTRL + BREAK on windows or CTRL + C on linux
except KeyboardInterrupt:
    tornado.ioloop.IOLoop.instance().stop()
捶死心动 2024-11-01 04:38:45

Zaar Hai 的解决方案存在一个问题,即它使套接字保持打开状态。我正在寻找停止 Tornado 的解决方案的原因是我正在针对我的应用程序服务器运行单元测试,并且我需要一种方法在测试之间启动/停止服务器以获得清晰的状态(空会话等)。通过将套接字保持打开状态,第二个测试总是会遇到 地址已在使用 错误。所以我想到了以下内容:

import logging as log
from time import sleep
from threading import Thread

import tornado
from tornado.httpserver import HTTPServer


server = None
thread = None


def start_app():
    def start():
        global server
        server = HTTPServer(create_app())
        server.listen(TEST_PORT, TEST_HOST)
        tornado.ioloop.IOLoop.instance().start()
    global thread
    thread = Thread(target=start)
    thread.start()
    # wait for the server to fully initialize
    sleep(0.5)


def stop_app():
    server.stop()
    # silence StreamClosedError Tornado is throwing after it is stopped
    log.getLogger().setLevel(log.FATAL)
    ioloop = tornado.ioloop.IOLoop.instance()
    ioloop.add_callback(ioloop.stop)
    thread.join()

所以这里的主要思想是保留对 HTTPServer 实例的引用并调用其 stop() 方法。 create_app() 仅返回一个配置有处理程序的 Application 实例。现在您可以在单元测试中使用这些方法,如下所示:

class FoobarTest(unittest.TestCase):

    def setUp(self):
        start_app()

    def tearDown(self):
        stop_app()

    def test_foobar(self):
        # here the server is up and running, so you can make requests to it
        pass

There is a problem with Zaar Hai's solution, namely that it leaves the socket open. The reason I was looking for a solution to stop Tornado is I'm running unit tests against my app server and I needed a way to start/stop the server between tests to have a clear state (empty session, etc.). By leaving the socket open, the second test always ran into an Address already in use error. So I came up with the following:

import logging as log
from time import sleep
from threading import Thread

import tornado
from tornado.httpserver import HTTPServer


server = None
thread = None


def start_app():
    def start():
        global server
        server = HTTPServer(create_app())
        server.listen(TEST_PORT, TEST_HOST)
        tornado.ioloop.IOLoop.instance().start()
    global thread
    thread = Thread(target=start)
    thread.start()
    # wait for the server to fully initialize
    sleep(0.5)


def stop_app():
    server.stop()
    # silence StreamClosedError Tornado is throwing after it is stopped
    log.getLogger().setLevel(log.FATAL)
    ioloop = tornado.ioloop.IOLoop.instance()
    ioloop.add_callback(ioloop.stop)
    thread.join()

So the main idea here is to keep a reference to the HTTPServer instance and call its stop() method. And create_app() just returns an Application instance configured with handlers. Now you can use these methods in your unit tests like this:

class FoobarTest(unittest.TestCase):

    def setUp(self):
        start_app()

    def tearDown(self):
        stop_app()

    def test_foobar(self):
        # here the server is up and running, so you can make requests to it
        pass
﹏雨一样淡蓝的深情 2024-11-01 04:38:45

要停止整个 ioloop,只需在完成单元测试后调用 ioloop.stop 方法即可。 (请记住,唯一的(已记录的)线程安全方法是 ioloop.add_callback,即,如果单元测试由另一个线程执行,您可以将停止调用包装在回调中)

如果它足以停止您调用的 http Web 服务器httpserver.stop() 方法

To stop the entire ioloop you simply invoke the ioloop.stop method when you have finished the unit test. (Remember that the only (documented) thread safe method is ioloop.add_callback, ie. if the unit tests is executed by another thread, you could wrap the stop invocation in a callback)

If its enough to stop the http web server you invoke the httpserver.stop() method

北座城市 2024-11-01 04:38:45

如果您需要这种行为进行单元测试,请查看 tornado。测试.AsyncTestCase

默认情况下,会为每个测试构建一个新的 IOLoop,并可作为 self.io_loop 使用。这个IOLoop应该用在HTTP客户端/服务器等的构造中。如果被测试的代码需要全局IOLoop,子类应该重写get_new_ioloop来返回它。

如果您需要出于其他目的启动和停止 IOLoop,并且由于某种原因无法从回调中调用 ioloop.stop(),则可以使用多线程实现。然而,为了避免竞争条件,您需要同步对 ioloop 的访问,这实际上是不可能的。类似下面的情况将导致死锁:

线程 1:

with lock:
    ioloop.start()

线程 2:

with lock:
    ioloop.stop()

因为线程 1 永远不会释放锁(start() 是阻塞的),而线程 2 将等待直到锁被释放以停止 ioloop。

唯一的方法是让线程 2 调用 ioloop.add_callback(ioloop.stop),这将在事件循环的下一次迭代中调用线程 1 上的 stop()。请放心,ioloop.add_callback() 是线程安全的

If you need this behavior for unit testing, take a look at tornado.testing.AsyncTestCase.

By default, a new IOLoop is constructed for each test and is available as self.io_loop. This IOLoop should be used in the construction of HTTP clients/servers, etc. If the code being tested requires a global IOLoop, subclasses should override get_new_ioloop to return it.

If you need to start and stop an IOLoop for some other purpose and can't call ioloop.stop() from a callback for some reason, a multi-threaded implementation is possible. To avoid race conditions, however, you need to synchronize access to the ioloop, which is actually impossible. Something like the following will result in deadlock:

Thread 1:

with lock:
    ioloop.start()

Thread 2:

with lock:
    ioloop.stop()

because thread 1 will never release the lock (start() is blocking) and thread 2 will wait till the lock is released to stop the ioloop.

The only way to do it is for thread 2 to call ioloop.add_callback(ioloop.stop), which will call stop() on thread 1 in the event loop's next iteration. Rest assured, ioloop.add_callback() is thread-safe.

对不⑦ 2024-11-01 04:38:43

这是如何从另一个线程停止 Torando 的解决方案。 Schildmeijer 提供了一个很好的提示,但我花了一段时间才真正弄清楚最终的可行示例。

请看下面:

import threading
import tornado.ioloop
import tornado.web
import time


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world!\n")

def start_tornado(*args, **kwargs):
    application = tornado.web.Application([
        (r"/", MainHandler),
    ])
    application.listen(8888)
    print "Starting Torando"
    tornado.ioloop.IOLoop.instance().start()
    print "Tornado finished"

def stop_tornado():
    ioloop = tornado.ioloop.IOLoop.instance()
    ioloop.add_callback(ioloop.stop)
    print "Asked Tornado to exit"

def main():

    t = threading.Thread(target=start_tornado)  
    t.start()

    time.sleep(5)

    stop_tornado()
    t.join()

if __name__ == "__main__":
    main()

Here is the solution how to stop Torando from another thread. Schildmeijer provided a good hint, but it took me a while to actually figure the final example that works.

Please see below:

import threading
import tornado.ioloop
import tornado.web
import time


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world!\n")

def start_tornado(*args, **kwargs):
    application = tornado.web.Application([
        (r"/", MainHandler),
    ])
    application.listen(8888)
    print "Starting Torando"
    tornado.ioloop.IOLoop.instance().start()
    print "Tornado finished"

def stop_tornado():
    ioloop = tornado.ioloop.IOLoop.instance()
    ioloop.add_callback(ioloop.stop)
    print "Asked Tornado to exit"

def main():

    t = threading.Thread(target=start_tornado)  
    t.start()

    time.sleep(5)

    stop_tornado()
    t.join()

if __name__ == "__main__":
    main()
比忠 2024-11-01 04:38:42

我刚刚遇到这个问题并自己发现了这个问题,并使用该线程中的信息得出了以下结论。我只是简单地将我的工作独立 Tornado 代码(从所有示例中复制)并将实际的起始代码移至一个函数中。然后我将该函数称为线程线程。我的情况有所不同,因为线程调用是根据我刚刚导入的 startTornado 和 stopTornado 例程的现有代码完成的。

上面的建议似乎很有效,所以我想我会提供缺少的示例代码。我在 Linux 下的 FC16 系统上测试了这段代码(并修复了我最初的 type-o)。

import tornado.ioloop, tornado.web

class Handler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([ (r"/", Handler) ])

def startTornado():
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

def stopTornado():
    tornado.ioloop.IOLoop.instance().stop()

if __name__ == "__main__":
    import time, threading
    threading.Thread(target=startTornado).start()
    print "Your web server will self destruct in 2 minutes"
    time.sleep(120)
    stopTornado()

希望这对下一个人有帮助。

I just ran into this and found this issue myself, and using info from this thread came up with the following. I simply took my working stand alone Tornado code (copied from all the examples) and moved the actual starting code into a function. I then called the function as a threading thread. My case different as the threading call was done from my existing code where I just imported the startTornado and stopTornado routines.

The suggestion above seemed to work great, so I figured I would supply the missing example code. I tested this code under Linux on a FC16 system (and fixed my initial type-o).

import tornado.ioloop, tornado.web

class Handler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

application = tornado.web.Application([ (r"/", Handler) ])

def startTornado():
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

def stopTornado():
    tornado.ioloop.IOLoop.instance().stop()

if __name__ == "__main__":
    import time, threading
    threading.Thread(target=startTornado).start()
    print "Your web server will self destruct in 2 minutes"
    time.sleep(120)
    stopTornado()

Hope this helps the next person.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文