填充传入消息队列并清空传出消息队列的后台 Twisted 服务器的模式?

发布于 2024-10-02 09:32:32 字数 494 浏览 1 评论 0原文

我想做这样的事情:

twistedServer.start() # This would be a nonblocking call

while True:
   while twistedServer.haveMessage():
      message = twistedServer.getMessage()
      response = handleMessage(message)
      twistedServer.sendResponse(response)
   doSomeOtherLogic()

我想做的关键是在后台线程中运行服务器。我希望使用线程而不是通过多重处理/队列来完成此操作,因为我的应用程序已经有一层消息传递,并且我想避免两层。我提出这个问题是因为我已经可以看到如何在单独的进程中执行此操作,但我想知道如何在线程中执行此操作,或者是否可以。或者,如果我可以使用其他一些模式来完成同样的事情,比如编写我自己的reactor.run方法。感谢您的任何帮助。 :)

I'd like to do something like this:

twistedServer.start() # This would be a nonblocking call

while True:
   while twistedServer.haveMessage():
      message = twistedServer.getMessage()
      response = handleMessage(message)
      twistedServer.sendResponse(response)
   doSomeOtherLogic()

The key thing I want to do is run the server in a background thread. I'm hoping to do this with a thread instead of through multiprocessing/queue because I already have one layer of messaging for my app and I'd like to avoid two. I'm bringing this up because I can already see how to do this in a separate process, but what I'd like to know is how to do it in a thread, or if I can. Or if perhaps there is some other pattern I can use that accomplishes this same thing, like perhaps writing my own reactor.run method. Thanks for any help.
:)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

泛滥成性 2024-10-09 09:32:32

我想做的关键是在后台线程中运行服务器。

不过,您没有解释为什么这是关键。一般来说,像“使用线程”这样的事情是实现细节。也许线程是合适的,也许不是,但实际目标在这一点上是不可知的。你的目标是什么?同时处理多个客户端?要同时处理此类消息和来自其他源(例如 Web 服务器)的事件?如果不知道最终目标,就无法知道我建议的实施策略是否有效。

考虑到这一点,这里有两种可能性。

首先,您可以忘记线程。这需要将上面的事件处理逻辑定义为事件处理部分。尝试获取事件的部分将被委托给应用程序的另一部分,可能最终基于反应器 API 之一(例如,您可以设置一个 TCP 服务器来接受消息并将其转换为您需要的事件)重新处理,在这种情况下,您将从调用某种reactor.listenTCP开始)。

因此,您的示例可能会变成这样(添加一些特殊性以尝试增加指导价值):

from twisted.internet import reactor

class MessageReverser(object):
    """
    Accept messages, reverse them, and send them onwards.
    """
    def __init__(self, server):
        self.server = server

    def messageReceived(self, message):
        """
        Callback invoked whenever a message is received.  This implementation
        will reverse and re-send the message.
        """
        self.server.sendMessage(message[::-1])
        doSomeOtherLogic()

def main():
    twistedServer = ...
    twistedServer.start(MessageReverser(twistedServer))
    reactor.run()

main()

有关此示例的几点注意事项:

  • 我不确定您的 twistedServer 是如何的定义的。我想象它以某种方式与网络交互。您的代码版本将让它接收消息并缓冲它们,直到它们被您的循环从缓冲区中删除以进行处理。此版本可能没有缓冲区,而是在消息到达时立即调用传递给 start 的对象的 messageReceived 方法。如果需要,您仍然可以添加某种缓冲,方法是将其放入 messageReceived 方法中。

  • 现在对 reactor.run 的调用将被阻止。您可以将此代码编写为 twistd 插件或 .tac 文件,在这种情况下,您不会直接负责启动反应器。然而,必须有人启动反应器,否则 Twisted 的大多数 API 将不会执行任何操作。当然,reactor.run 会阻塞,直到有人调用 reactor.stop

  • 此方法没有使用任何线程。 Twisted 的协作式多任务并发方法意味着只要您注意协作(这通常意味着偶尔返回到反应器),您仍然可以同时执行多项操作。

  • 调用 doSomeOtherLogic 函数的确切时间略有变化,因为没有“缓冲区现在为空”与“我刚刚处理了一条消息”的概念。您可以更改此设置,以便每秒调用一次该函数,或者在每 N 个消息之后调用一次,或者其他任何适当的方式。

第二种可能性是真正使用线程。这可能看起来与前面的示例非常相似,但您将在另一个线程而不是主线程中调用reactor.run。例如,

from Queue import Queue
from threading import Thread

class MessageQueuer(object):
    def __init__(self, queue):
        self.queue = queue

    def messageReceived(self, message):
        self.queue.put(message)

def main():
    queue = Queue()
    twistedServer = ...
    twistedServer.start(MessageQueuer(queue))
    Thread(target=reactor.run, args=(False,)).start()

    while True:
        message = queue.get()
        response = handleMessage(message)
        reactor.callFromThread(twistedServer.sendResponse, response)

main()

此版本假定一个 twistedServer 其工作方式类似,但使用线程让您拥有 while True: 循环。注意:

  • 如果使用线程,则必须调用 reactor.run(False),以防止 Twisted 尝试安装任何信号处理程序,Python 只允许在主线程中安装信号处理程序。这意味着 Ctrl-C 处理将被禁用,并且 reactor.spawnProcess 将无法可靠地工作。

  • MessageQueuerMessageReverser 具有相同的接口,只是其 messageReceived 的实现不同。它使用线程安全队列对象在反应器线程(将在其中调用它)和运行 while True: 循环的主线程之间进行通信。

  • 您必须使用reactor.callFromThread将消息发送回反应器线程(假设twistedServer.sendResponse实际上基于Twisted API)。 Twisted API 通常不是线程安全的,必须在反应器线程中调用。这就是 reactor.callFromThread 为您所做的事情。

  • 有人认为,您需要实施某种方法来停止循环和反应器。在您调用reactor.stop之前,Python进程不会完全退出。

请注意,虽然线程版本为您提供了熟悉的、所需的 while True 循环,但它实际上并没有比非线程版本做得更好。只是更复杂了。因此,请考虑您是否真的需要线程,或者它们是否只是一种可以替换为其他东西的实现技术。

The key thing I want to do is run the server in a background thread.

You don't explain why this is key, though. Generally, things like "use threads" are implementation details. Perhaps threads are appropriate, perhaps not, but the actual goal is agnostic on the point. What is your goal? To handle multiple clients concurrently? To handle messages of this sort simultaneously with events from another source (for example, a web server)? Without knowing the ultimate goal, there's no way to know if an implementation strategy I suggest will work or not.

With that in mind, here are two possibilities.

First, you could forget about threads. This would entail defining your event handling logic above as only the event handling parts. The part that tries to get an event would be delegated to another part of the application, probably something ultimately based on one of the reactor APIs (for example, you might set up a TCP server which accepts messages and turns them into the events you're processing, in which case you would start off with a call to reactor.listenTCP of some sort).

So your example might turn into something like this (with some added specificity to try to increase the instructive value):

from twisted.internet import reactor

class MessageReverser(object):
    """
    Accept messages, reverse them, and send them onwards.
    """
    def __init__(self, server):
        self.server = server

    def messageReceived(self, message):
        """
        Callback invoked whenever a message is received.  This implementation
        will reverse and re-send the message.
        """
        self.server.sendMessage(message[::-1])
        doSomeOtherLogic()

def main():
    twistedServer = ...
    twistedServer.start(MessageReverser(twistedServer))
    reactor.run()

main()

Several points to note about this example:

  • I'm not sure how your twistedServer is defined. I'm imagining that it interfaces with the network in some way. Your version of the code would have had it receiving messages and buffering them until they were removed from the buffer by your loop for processing. This version would probably have no buffer, but instead just call the messageReceived method of the object passed to start as soon as a message arrives. You could still add buffering of some sort if you want, by putting it into the messageReceived method.

  • There is now a call to reactor.run which will block. You might instead write this code as a twistd plugin or a .tac file, in which case you wouldn't be directly responsible for starting the reactor. However, someone must start the reactor, or most APIs from Twisted won't do anything. reactor.run blocks, of course, until someone calls reactor.stop.

  • There are no threads used by this approach. Twisted's cooperative multitasking approach to concurrency means you can still do multiple things at once, as long as you're mindful to cooperate (which usually means returning to the reactor once in a while).

  • The exact times the doSomeOtherLogic function is called is changed slightly, because there's no notion of "the buffer is empty for now" separate from "I just handled a message". You could change this so that the function is installed called once a second, or after every N messages, or whatever is appropriate.

The second possibility would be to really use threads. This might look very similar to the previous example, but you would call reactor.run in another thread, rather than the main thread. For example,

from Queue import Queue
from threading import Thread

class MessageQueuer(object):
    def __init__(self, queue):
        self.queue = queue

    def messageReceived(self, message):
        self.queue.put(message)

def main():
    queue = Queue()
    twistedServer = ...
    twistedServer.start(MessageQueuer(queue))
    Thread(target=reactor.run, args=(False,)).start()

    while True:
        message = queue.get()
        response = handleMessage(message)
        reactor.callFromThread(twistedServer.sendResponse, response)

main()

This version assumes a twistedServer which works similarly, but uses a thread to let you have the while True: loop. Note:

  • You must invoke reactor.run(False) if you use a thread, to prevent Twisted from trying to install any signal handlers, which Python only allows to be installed in the main thread. This means the Ctrl-C handling will be disabled and reactor.spawnProcess won't work reliably.

  • MessageQueuer has the same interface as MessageReverser, only its implementation of messageReceived is different. It uses the threadsafe Queue object to communicate between the reactor thread (in which it will be called) and your main thread where the while True: loop is running.

  • You must use reactor.callFromThread to send the message back to the reactor thread (assuming twistedServer.sendResponse is actually based on Twisted APIs). Twisted APIs are typically not threadsafe and must be called in the reactor thread. This is what reactor.callFromThread does for you.

  • You'll want to implement some way to stop the loop and the reactor, one supposes. The python process won't exit cleanly until after you call reactor.stop.

Note that while the threaded version gives you the familiar, desired while True loop, it doesn't actually do anything much better than the non-threaded version. It's just more complicated. So, consider whether you actually need threads, or if they're merely an implementation technique that can be exchanged for something else.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文