对 Python Twisted 透视代理进行远程调用排队?

发布于 2024-09-01 21:54:14 字数 275 浏览 5 评论 0原文

Twisted(对于 python)的优势在于它的异步框架(我认为)。我编写了一个图像处理服务器,它通过 Perspective Broker 接收请求。只要我一次给它提供少于几百张图像,它就可以很好地工作。然而,有时它几乎同时出现数百张图像。因为它试图同时处理所有这些,所以服务器崩溃了。

作为一个解决方案,我想在服务器上对 Remote_calls 进行排队,以便它一次仅处理约 100 个图像。看起来这可能是 Twisted 已经做的事情,但我似乎找不到它。关于如何开始实施这个有什么想法吗?方向正确的一点?谢谢!

The strength of Twisted (for python) is its asynchronous framework (I think). I've written an image processing server that takes requests via Perspective Broker. It works great as long as I feed it less than a couple hundred images at a time. However, sometimes it gets spiked with hundreds of images at virtually the same time. Because it tries to process them all concurrently the server crashes.

As a solution I'd like to queue up the remote_calls on the server so that it only processes ~100 images at a time. It seems like this might be something that Twisted already does, but I can't seem to find it. Any ideas on how to start implementing this? A point in the right direction? Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

忘东忘西忘不掉你 2024-09-08 21:54:14

一个可能对此有所帮助的现成选项是twisted.internet.defer.DeferredSemaphore。这是普通(计数)信号量的异步版本,如果您做过很多线程编程,您可能已经知道了。

(计数)信号量很像互斥锁(锁)。但是,如果互斥锁在相应的释放之前只能获取一次,则可以将(计数)信号量配置为允许在需要任何相应的释放之前成功进行任意(但指定的)次数的获取。

下面是使用 DeferredSemaphore 运行十个异步操作的示例,但一次最多运行其中三个:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore 还具有显式的 acquirerelease 方法,但是 run 方法非常方便,几乎总是您想要的。它调用 acquire 方法,该方法返回一个 Deferred。对于第一个 Deferred,它添加了一个回调,该回调调用您传入的函数(以及任何位置或关键字参数)。如果该函数返回一个 Deferred,则向第二个 Deferred 添加一个调用 release 方法的回调。

同步情况也可以通过立即调用release来处理。还可以通过允许错误传播来处理错误,但确保完成必要的释放以使 DeferredSemaphore 保持一致状态。传递给 run 的函数结果(或其返回的 Deferred 的结果)将成为 返回的 Deferred 的结果运行

另一种可能的方法可能基于DeferredQueue 和cooperate。 DeferredQueue 与普通队列基本相似,但其 get 方法返回一个 Deferred。如果调用时队列中没有项目,则在添加项目之前,Deferred 将不会触发。

下面是一个示例:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

请注意,async 辅助函数与第一个示例中的函数相同。不过,这一次,还有一个 worker 函数,它显式地将作业从 DeferredQueue 中拉出,并使用 async 处理它们(通过添加 >async 作为对 get 返回的 Deferred 的回调)。 worker 生成器由 cooperate 驱动,它在每次 Deferred 产生触发后迭代一次。然后,主循环启动其中三个工作生成器,以便在任何给定时间都将进行三个作业。

这种方法比 DeferredSemaphore 方法涉及更多的代码,但有一些可能有趣的好处。首先,cooperate 返回一个 CooperativeTask 实例,该实例具有诸如 pauseresume 等有用的方法。此外,分配给同一合作者的所有作业将在调度中相互合作,以免事件循环过载(这就是 API 名称的由来)。在 DeferredQueue 方面,还可以对等待处理的项目数量设置限制,这样您就可以避免服务器完全超载(例如,如果您的图像处理器卡住并停止完成任务)。如果调用 put 的代码处理队列溢出异常,您可以将此作为压力来尝试停止接受新作业(可能将它们分流到另一台服务器,或警告管理员)。使用 DeferredSemaphore 执行类似的操作有点棘手,因为无法限制等待获取信号量的作业数量。

One ready-made option that might help with this is twisted.internet.defer.DeferredSemaphore. This is the asynchronous version of the normal (counting) semaphore you might already know if you've done much threaded programming.

A (counting) semaphore is a lot like a mutex (a lock). But where a mutex can only be acquired once until a corresponding release, a (counting) semaphore can be configured to allow an arbitrary (but specified) number of acquisitions to succeed before any corresponding releases are required.

Here's an example of using DeferredSemaphore to run ten asynchronous operations, but to run at most three of them at once:

from twisted.internet.defer import DeferredSemaphore, gatherResults
from twisted.internet.task import deferLater
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def main():
    sem = DeferredSemaphore(3)

    jobs = []
    for i in range(10):
        jobs.append(sem.run(async, i))

    d = gatherResults(jobs)
    d.addCallback(lambda ignored: reactor.stop())
    reactor.run()


if __name__ == '__main__':
    main()

DeferredSemaphore also has explicit acquire and release methods, but the run method is so convenient it's almost always what you want. It calls the acquire method, which returns a Deferred. To that first Deferred, it adds a callback which calls the function you passed in (along with any positional or keyword arguments). If that function returns a Deferred, then to that second Deferred a callback is added which calls the release method.

The synchronous case is handled as well, by immediately calling release. Errors are also handled, by allowing them to propagate but making sure the necessary release is done to leave the DeferredSemaphore in a consistent state. The result of the function passed to run (or the result of the Deferred it returns) becomes the result of the Deferred returned by run.

Another possible approach might be based on DeferredQueue and cooperate. DeferredQueue is mostly like a normal queue, but its get method returns a Deferred. If there happen to be no items in the queue at the time of the call, the Deferred will not fire until an item is added.

Here's an example:

from random import randrange

from twisted.internet.defer import DeferredQueue
from twisted.internet.task import deferLater, cooperate
from twisted.internet import reactor


def async(n):
    print 'Starting job', n
    d = deferLater(reactor, n, lambda: None)
    def cbFinished(ignored):
        print 'Finishing job', n
    d.addCallback(cbFinished)
    return d


def assign(jobs):
    # Create new jobs to be processed
    jobs.put(randrange(10))
    reactor.callLater(randrange(10), assign, jobs)


def worker(jobs):
    while True:
        yield jobs.get().addCallback(async)


def main():
    jobs = DeferredQueue()

    for i in range(10):
        jobs.put(i)

    assign(jobs)

    for i in range(3):
        cooperate(worker(jobs))

    reactor.run()


if __name__ == '__main__':
    main()

Note that the async worker function is the same as the one from the first example. However, this time, there's also a worker function which is explicitly pulling jobs out of the DeferredQueue and processing them with async (by adding async as a callback to the Deferred returned by get). The worker generator is driven by cooperate, which iterates it once after each Deferred it yields fires. The main loop, then, starts three of these worker generators so that three jobs will be in progress at any given time.

This approach involves a bit more code than the DeferredSemaphore approach, but has some benefits which may be interesting. First, cooperate returns a CooperativeTask instance which has useful methods like pause, resume, and a couple others. Also, all jobs assigned to the same cooperator will cooperate with each other in scheduling, so as not to overload the event loop (and this is what gives the API its name). On the DeferredQueue side, it's also possible to set limits on how many items are pending processing, so you can avoid completely overloading your server (for example, if your image processors get stuck and stop completing tasks). If the code calling put handles the queue overflow exception, you can use this as pressure to try to stop accepting new jobs (perhaps shunting them to another server, or alerting an administrator). Doing similar things with DeferredSemaphore is a bit trickier, since there's no way to limit how many jobs are waiting to be able to acquire the semaphore.

筱武穆 2024-09-08 21:54:14

您可能还喜欢我写的 txRDQ(可调整大小的调度队列)。谷歌一下,它在 LaunchPad 上的 tx 集合中。抱歉,我没有更多时间回复 - 即将上台。

特里

You might also like the txRDQ (Resizable Dispatch Queue) I wrote. Google it, it's in the tx collection on LaunchPad. Sorry I don't have more time to reply - about to go onstage.

Terry

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文