填充传入消息队列并清空传出消息队列的后台 Twisted 服务器的模式?
我想做这样的事情:
twistedServer.start() # This would be a nonblocking call
while True:
while twistedServer.haveMessage():
message = twistedServer.getMessage()
response = handleMessage(message)
twistedServer.sendResponse(response)
doSomeOtherLogic()
我想做的关键是在后台线程中运行服务器。我希望使用线程而不是通过多重处理/队列来完成此操作,因为我的应用程序已经有一层消息传递,并且我想避免两层。我提出这个问题是因为我已经可以看到如何在单独的进程中执行此操作,但我想知道如何在线程中执行此操作,或者是否可以。或者,如果我可以使用其他一些模式来完成同样的事情,比如编写我自己的reactor.run方法。感谢您的任何帮助。 :)
I'd like to do something like this:
twistedServer.start() # This would be a nonblocking call
while True:
while twistedServer.haveMessage():
message = twistedServer.getMessage()
response = handleMessage(message)
twistedServer.sendResponse(response)
doSomeOtherLogic()
The key thing I want to do is run the server in a background thread. I'm hoping to do this with a thread instead of through multiprocessing/queue because I already have one layer of messaging for my app and I'd like to avoid two. I'm bringing this up because I can already see how to do this in a separate process, but what I'd like to know is how to do it in a thread, or if I can. Or if perhaps there is some other pattern I can use that accomplishes this same thing, like perhaps writing my own reactor.run method. Thanks for any help.
:)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
不过,您没有解释为什么这是关键。一般来说,像“使用线程”这样的事情是实现细节。也许线程是合适的,也许不是,但实际目标在这一点上是不可知的。你的目标是什么?同时处理多个客户端?要同时处理此类消息和来自其他源(例如 Web 服务器)的事件?如果不知道最终目标,就无法知道我建议的实施策略是否有效。
考虑到这一点,这里有两种可能性。
首先,您可以忘记线程。这需要将上面的事件处理逻辑定义为仅事件处理部分。尝试获取事件的部分将被委托给应用程序的另一部分,可能最终基于反应器 API 之一(例如,您可以设置一个 TCP 服务器来接受消息并将其转换为您需要的事件)重新处理,在这种情况下,您将从调用某种reactor.listenTCP开始)。
因此,您的示例可能会变成这样(添加一些特殊性以尝试增加指导价值):
有关此示例的几点注意事项:
我不确定您的
twistedServer
是如何的定义的。我想象它以某种方式与网络交互。您的代码版本将让它接收消息并缓冲它们,直到它们被您的循环从缓冲区中删除以进行处理。此版本可能没有缓冲区,而是在消息到达时立即调用传递给start
的对象的messageReceived
方法。如果需要,您仍然可以添加某种缓冲,方法是将其放入messageReceived
方法中。现在对
reactor.run
的调用将被阻止。您可以将此代码编写为twistd
插件或 .tac 文件,在这种情况下,您不会直接负责启动反应器。然而,必须有人启动反应器,否则 Twisted 的大多数 API 将不会执行任何操作。当然,reactor.run
会阻塞,直到有人调用reactor.stop
。此方法没有使用任何线程。 Twisted 的协作式多任务并发方法意味着只要您注意协作(这通常意味着偶尔返回到反应器),您仍然可以同时执行多项操作。
调用 doSomeOtherLogic 函数的确切时间略有变化,因为没有“缓冲区现在为空”与“我刚刚处理了一条消息”的概念。您可以更改此设置,以便每秒调用一次该函数,或者在每 N 个消息之后调用一次,或者其他任何适当的方式。
第二种可能性是真正使用线程。这可能看起来与前面的示例非常相似,但您将在另一个线程而不是主线程中调用reactor.run。例如,
此版本假定一个
twistedServer
其工作方式类似,但使用线程让您拥有while True:
循环。注意:如果使用线程,则必须调用
reactor.run(False)
,以防止 Twisted 尝试安装任何信号处理程序,Python 只允许在主线程中安装信号处理程序。这意味着 Ctrl-C 处理将被禁用,并且reactor.spawnProcess
将无法可靠地工作。MessageQueuer
与MessageReverser
具有相同的接口,只是其messageReceived
的实现不同。它使用线程安全队列对象在反应器线程(将在其中调用它)和运行while True:
循环的主线程之间进行通信。您必须使用
reactor.callFromThread
将消息发送回反应器线程(假设twistedServer.sendResponse
实际上基于Twisted API)。 Twisted API 通常不是线程安全的,必须在反应器线程中调用。这就是reactor.callFromThread
为您所做的事情。有人认为,您需要实施某种方法来停止循环和反应器。在您调用reactor.stop之前,Python进程不会完全退出。
请注意,虽然线程版本为您提供了熟悉的、所需的
while True
循环,但它实际上并没有比非线程版本做得更好。只是更复杂了。因此,请考虑您是否真的需要线程,或者它们是否只是一种可以替换为其他东西的实现技术。You don't explain why this is key, though. Generally, things like "use threads" are implementation details. Perhaps threads are appropriate, perhaps not, but the actual goal is agnostic on the point. What is your goal? To handle multiple clients concurrently? To handle messages of this sort simultaneously with events from another source (for example, a web server)? Without knowing the ultimate goal, there's no way to know if an implementation strategy I suggest will work or not.
With that in mind, here are two possibilities.
First, you could forget about threads. This would entail defining your event handling logic above as only the event handling parts. The part that tries to get an event would be delegated to another part of the application, probably something ultimately based on one of the reactor APIs (for example, you might set up a TCP server which accepts messages and turns them into the events you're processing, in which case you would start off with a call to reactor.listenTCP of some sort).
So your example might turn into something like this (with some added specificity to try to increase the instructive value):
Several points to note about this example:
I'm not sure how your
twistedServer
is defined. I'm imagining that it interfaces with the network in some way. Your version of the code would have had it receiving messages and buffering them until they were removed from the buffer by your loop for processing. This version would probably have no buffer, but instead just call themessageReceived
method of the object passed tostart
as soon as a message arrives. You could still add buffering of some sort if you want, by putting it into themessageReceived
method.There is now a call to
reactor.run
which will block. You might instead write this code as atwistd
plugin or a .tac file, in which case you wouldn't be directly responsible for starting the reactor. However, someone must start the reactor, or most APIs from Twisted won't do anything.reactor.run
blocks, of course, until someone callsreactor.stop
.There are no threads used by this approach. Twisted's cooperative multitasking approach to concurrency means you can still do multiple things at once, as long as you're mindful to cooperate (which usually means returning to the reactor once in a while).
The exact times the
doSomeOtherLogic
function is called is changed slightly, because there's no notion of "the buffer is empty for now" separate from "I just handled a message". You could change this so that the function is installed called once a second, or after every N messages, or whatever is appropriate.The second possibility would be to really use threads. This might look very similar to the previous example, but you would call
reactor.run
in another thread, rather than the main thread. For example,This version assumes a
twistedServer
which works similarly, but uses a thread to let you have thewhile True:
loop. Note:You must invoke
reactor.run(False)
if you use a thread, to prevent Twisted from trying to install any signal handlers, which Python only allows to be installed in the main thread. This means the Ctrl-C handling will be disabled andreactor.spawnProcess
won't work reliably.MessageQueuer
has the same interface asMessageReverser
, only its implementation ofmessageReceived
is different. It uses the threadsafe Queue object to communicate between the reactor thread (in which it will be called) and your main thread where thewhile True:
loop is running.You must use
reactor.callFromThread
to send the message back to the reactor thread (assumingtwistedServer.sendResponse
is actually based on Twisted APIs). Twisted APIs are typically not threadsafe and must be called in the reactor thread. This is whatreactor.callFromThread
does for you.You'll want to implement some way to stop the loop and the reactor, one supposes. The python process won't exit cleanly until after you call
reactor.stop
.Note that while the threaded version gives you the familiar, desired
while True
loop, it doesn't actually do anything much better than the non-threaded version. It's just more complicated. So, consider whether you actually need threads, or if they're merely an implementation technique that can be exchanged for something else.