扭曲:如何在反应器代码和线程代码之间优雅地通信?

发布于 2024-09-14 10:12:56 字数 1163 浏览 6 评论 0原文

我有一个客户端使用twisted 连接到服务器。客户端有一个可能在后台执行操作的线程。当反应堆关闭时,我必须:

1) check if the thread is doing things
2) stop it if it is

有什么优雅的方法可以做到这一点?我能做的最好的事情就是做一些令人困惑的事情,例如:

def cleanup(self):
    isWorkingDF = defer.Deferred()
    doneDF = defer.Deferred()

    def checkIsWorking():
        res = self.stuff.isWorking() #blocking call
        reactor.callFromThread(isWorkingDF.callback, res)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            def shutdown():
                self.stuff.shutdown()
                reactor.callFromThread(doneDF, None)
            reactor.callInThread(shutdown)                
        else:
            doneDF.callback(None) #no shutdown needed

    isWorkingDF.addCallback(shutdownOrNot)

    reactor.callInThread(checkIsWorking)

    return doneDF

首先我们检查它是否正常工作。该回调的结果进入 rescallback ,它要么关闭,要么不关闭,然后触发 didDF,它扭曲等待直到关闭。

相当混乱呃!有更好的办法吗?

也许一个相关的问题是,是否有一种更优雅的方式将回调相互链接?我可以看到自己需要在完成此操作后执行更多清理代码,因此我必须推迟另一个 done ,并让当前的 doneDF 触发回调它执行一些操作,然后调用延迟完成的操作。

I have a client connected to a server using twisted. The client has a thread which might potentially be doing things in the background. When the reactor is shutting down, I have to:

1) check if the thread is doing things
2) stop it if it is

What's an elegant way to do this? The best I can do is some confused thing like:

def cleanup(self):
    isWorkingDF = defer.Deferred()
    doneDF = defer.Deferred()

    def checkIsWorking():
        res = self.stuff.isWorking() #blocking call
        reactor.callFromThread(isWorkingDF.callback, res)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            def shutdown():
                self.stuff.shutdown()
                reactor.callFromThread(doneDF, None)
            reactor.callInThread(shutdown)                
        else:
            doneDF.callback(None) #no shutdown needed

    isWorkingDF.addCallback(shutdownOrNot)

    reactor.callInThread(checkIsWorking)

    return doneDF

First we check if it's working at all. The result of that callback goes into rescallback which either shuts down or doesn't, and then fires the doneDF, which twisted waits for until closing.

Pretty messed up eh! Is there a better way?

Maybe a related question is, is there a more elegant way to chain callbacks to each other? I could see myself needing to do more cleanup code after this is done, so then I'd have to make a different done deferred, and have the current doneDF fire a callback which does stuff then calls that done deferred..

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

葬花如无物 2024-09-21 10:12:56

啊,真正的答案是使用 defer.inlineCallbacks 装饰器。上面的代码现在变成:

@defer.inlineCallbacks
def procShutdownStuff(self):
    isWorking = yield deferToThread(self.stuff.isWorking)

    if isWorking:
        yield deferToThread(self.stuff.shutdown)

def cleanup(self):
    return self.procShutdownStuff()

Ah the real answer is to use the defer.inlineCallbacks decorator. The above code now becomes:

@defer.inlineCallbacks
def procShutdownStuff(self):
    isWorking = yield deferToThread(self.stuff.isWorking)

    if isWorking:
        yield deferToThread(self.stuff.shutdown)

def cleanup(self):
    return self.procShutdownStuff()
挽清梦 2024-09-21 10:12:56

您可以通过使用 deferToThread 而不是 callInThread/callFromThread 对来简化这一点:

from twisted.internet.threads import deferToThread

def cleanup(self):
    isWorkingDF = deferToThread(self.stuff.isWorking)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            return deferToThread(self.stuff.shutdown)

    isWorkingDF.addCallback(shutdownOrNot)

    return isWorkingDF

deferToThread 基本上只是一个不错的选择围绕您在函数版本中实现两次的相同线程逻辑进行包装。

You can simplify this somewhat by using deferToThread instead of the callInThread/callFromThread pairs:

from twisted.internet.threads import deferToThread

def cleanup(self):
    isWorkingDF = deferToThread(self.stuff.isWorking)

    def shutdownOrNot(isWorking):
        if isWorking:
            #shutdown necessary, shutdown is also a blocking call
            return deferToThread(self.stuff.shutdown)

    isWorkingDF.addCallback(shutdownOrNot)

    return isWorkingDF

deferToThread is basically just a nice wrapper around the same threading logic you had implemented twice in your version of the function.

惜醉颜 2024-09-21 10:12:56

如果程序在关闭反应器后终止,则可以将该线程设置为守护线程。当所有非守护线程终止时,这将自动退出。只需在调用 start() 之前在线程对象上设置 daemon = True 即可。

如果这是不可行的,例如线程必须在退出之前进行资源清理,那么您可以使用队列在反应器和线程之间进行通信。将要完成的工作推送到队列对象上,然后让线程将其拉出并执行。有一个特殊的“FINISH”标记(或简单地“无”)来指示线程需要终止。

If the program is terminating after you shut down the reactor you could make the thread a daemon thread. This will automatically exit when all the non-daemon threads terminate. Just set daemon = True on the thread object before you call start().

If this is not viable, e.g. the thread has to do resource cleanup before it exits then you could communicate between the reactor and the thread with a Queue. Push work to be done onto a Queue object and have the thread pull it off and do it. Have a special "FINISH" token (or simply None) to indicate that the thread needs to terminate.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文