扭曲:如何在反应器代码和线程代码之间优雅地通信?
我有一个客户端使用twisted 连接到服务器。客户端有一个可能在后台执行操作的线程。当反应堆关闭时,我必须:
1) check if the thread is doing things
2) stop it if it is
有什么优雅的方法可以做到这一点?我能做的最好的事情就是做一些令人困惑的事情,例如:
def cleanup(self):
isWorkingDF = defer.Deferred()
doneDF = defer.Deferred()
def checkIsWorking():
res = self.stuff.isWorking() #blocking call
reactor.callFromThread(isWorkingDF.callback, res)
def shutdownOrNot(isWorking):
if isWorking:
#shutdown necessary, shutdown is also a blocking call
def shutdown():
self.stuff.shutdown()
reactor.callFromThread(doneDF, None)
reactor.callInThread(shutdown)
else:
doneDF.callback(None) #no shutdown needed
isWorkingDF.addCallback(shutdownOrNot)
reactor.callInThread(checkIsWorking)
return doneDF
首先我们检查它是否正常工作。该回调的结果进入 rescallback ,它要么关闭,要么不关闭,然后触发 didDF,它扭曲等待直到关闭。
相当混乱呃!有更好的办法吗?
也许一个相关的问题是,是否有一种更优雅的方式将回调相互链接?我可以看到自己需要在完成此操作后执行更多清理代码,因此我必须推迟另一个 done
,并让当前的 doneDF
触发回调它执行一些操作,然后调用延迟完成的操作。
I have a client connected to a server using twisted. The client has a thread which might potentially be doing things in the background. When the reactor is shutting down, I have to:
1) check if the thread is doing things
2) stop it if it is
What's an elegant way to do this? The best I can do is some confused thing like:
def cleanup(self):
isWorkingDF = defer.Deferred()
doneDF = defer.Deferred()
def checkIsWorking():
res = self.stuff.isWorking() #blocking call
reactor.callFromThread(isWorkingDF.callback, res)
def shutdownOrNot(isWorking):
if isWorking:
#shutdown necessary, shutdown is also a blocking call
def shutdown():
self.stuff.shutdown()
reactor.callFromThread(doneDF, None)
reactor.callInThread(shutdown)
else:
doneDF.callback(None) #no shutdown needed
isWorkingDF.addCallback(shutdownOrNot)
reactor.callInThread(checkIsWorking)
return doneDF
First we check if it's working at all. The result of that callback goes into rescallback
which either shuts down or doesn't, and then fires the doneDF, which twisted waits for until closing.
Pretty messed up eh! Is there a better way?
Maybe a related question is, is there a more elegant way to chain callbacks to each other? I could see myself needing to do more cleanup code after this is done, so then I'd have to make a different done
deferred, and have the current doneDF
fire a callback which does stuff then calls that done
deferred..
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
啊,真正的答案是使用 defer.inlineCallbacks 装饰器。上面的代码现在变成:
Ah the real answer is to use the
defer.inlineCallbacks
decorator. The above code now becomes:您可以通过使用
deferToThread
而不是callInThread
/callFromThread
对来简化这一点:deferToThread
基本上只是一个不错的选择围绕您在函数版本中实现两次的相同线程逻辑进行包装。You can simplify this somewhat by using
deferToThread
instead of thecallInThread
/callFromThread
pairs:deferToThread
is basically just a nice wrapper around the same threading logic you had implemented twice in your version of the function.如果程序在关闭反应器后终止,则可以将该线程设置为守护线程。当所有非守护线程终止时,这将自动退出。只需在调用 start() 之前在线程对象上设置 daemon = True 即可。
如果这是不可行的,例如线程必须在退出之前进行资源清理,那么您可以使用队列在反应器和线程之间进行通信。将要完成的工作推送到队列对象上,然后让线程将其拉出并执行。有一个特殊的“FINISH”标记(或简单地“无”)来指示线程需要终止。
If the program is terminating after you shut down the reactor you could make the thread a daemon thread. This will automatically exit when all the non-daemon threads terminate. Just set
daemon = True
on the thread object before you call start().If this is not viable, e.g. the thread has to do resource cleanup before it exits then you could communicate between the reactor and the thread with a Queue. Push work to be done onto a Queue object and have the thread pull it off and do it. Have a special "FINISH" token (or simply None) to indicate that the thread needs to terminate.