对 Python Twisted 透视代理进行远程调用排队?
Twisted(对于 python)的优势在于它的异步框架(我认为)。我编写了一个图像处理服务器,它通过 Perspective Broker 接收请求。只要我一次给它提供少于几百张图像,它就可以很好地工作。然而,有时它几乎同时出现数百张图像。因为它试图同时处理所有这些,所以服务器崩溃了。
作为一个解决方案,我想在服务器上对 Remote_calls 进行排队,以便它一次仅处理约 100 个图像。看起来这可能是 Twisted 已经做的事情,但我似乎找不到它。关于如何开始实施这个有什么想法吗?方向正确的一点?谢谢!
The strength of Twisted (for python) is its asynchronous framework (I think). I've written an image processing server that takes requests via Perspective Broker. It works great as long as I feed it less than a couple hundred images at a time. However, sometimes it gets spiked with hundreds of images at virtually the same time. Because it tries to process them all concurrently the server crashes.
As a solution I'd like to queue up the remote_calls on the server so that it only processes ~100 images at a time. It seems like this might be something that Twisted already does, but I can't seem to find it. Any ideas on how to start implementing this? A point in the right direction? Thanks!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
一个可能对此有所帮助的现成选项是twisted.internet.defer.DeferredSemaphore。这是普通(计数)信号量的异步版本,如果您做过很多线程编程,您可能已经知道了。
(计数)信号量很像互斥锁(锁)。但是,如果互斥锁在相应的释放之前只能获取一次,则可以将(计数)信号量配置为允许在需要任何相应的释放之前成功进行任意(但指定的)次数的获取。
下面是使用
DeferredSemaphore
运行十个异步操作的示例,但一次最多运行其中三个:DeferredSemaphore
还具有显式的acquire
和release
方法,但是run
方法非常方便,几乎总是您想要的。它调用acquire
方法,该方法返回一个Deferred
。对于第一个Deferred
,它添加了一个回调,该回调调用您传入的函数(以及任何位置或关键字参数)。如果该函数返回一个Deferred
,则向第二个Deferred
添加一个调用release
方法的回调。同步情况也可以通过立即调用release来处理。还可以通过允许错误传播来处理错误,但确保完成必要的释放以使 DeferredSemaphore 保持一致状态。传递给
run
的函数结果(或其返回的Deferred
的结果)将成为返回的
。Deferred
的结果运行另一种可能的方法可能基于DeferredQueue 和cooperate。
DeferredQueue
与普通队列基本相似,但其get
方法返回一个Deferred
。如果调用时队列中没有项目,则在添加项目之前,Deferred
将不会触发。下面是一个示例:
请注意,
async
辅助函数与第一个示例中的函数相同。不过,这一次,还有一个worker
函数,它显式地将作业从DeferredQueue
中拉出,并使用async
处理它们(通过添加>async
作为对get
返回的Deferred
的回调)。worker
生成器由cooperate
驱动,它在每次Deferred
产生触发后迭代一次。然后,主循环启动其中三个工作生成器,以便在任何给定时间都将进行三个作业。这种方法比
DeferredSemaphore
方法涉及更多的代码,但有一些可能有趣的好处。首先,cooperate
返回一个CooperativeTask
实例,该实例具有诸如pause
、resume
等有用的方法。此外,分配给同一合作者的所有作业将在调度中相互合作,以免事件循环过载(这就是 API 名称的由来)。在 DeferredQueue 方面,还可以对等待处理的项目数量设置限制,这样您就可以避免服务器完全超载(例如,如果您的图像处理器卡住并停止完成任务)。如果调用put
的代码处理队列溢出异常,您可以将此作为压力来尝试停止接受新作业(可能将它们分流到另一台服务器,或警告管理员)。使用 DeferredSemaphore 执行类似的操作有点棘手,因为无法限制等待获取信号量的作业数量。One ready-made option that might help with this is
twisted.internet.defer.DeferredSemaphore
. This is the asynchronous version of the normal (counting) semaphore you might already know if you've done much threaded programming.A (counting) semaphore is a lot like a mutex (a lock). But where a mutex can only be acquired once until a corresponding release, a (counting) semaphore can be configured to allow an arbitrary (but specified) number of acquisitions to succeed before any corresponding releases are required.
Here's an example of using
DeferredSemaphore
to run ten asynchronous operations, but to run at most three of them at once:DeferredSemaphore
also has explicitacquire
andrelease
methods, but therun
method is so convenient it's almost always what you want. It calls theacquire
method, which returns aDeferred
. To that firstDeferred
, it adds a callback which calls the function you passed in (along with any positional or keyword arguments). If that function returns aDeferred
, then to that secondDeferred
a callback is added which calls therelease
method.The synchronous case is handled as well, by immediately calling
release
. Errors are also handled, by allowing them to propagate but making sure the necessaryrelease
is done to leave theDeferredSemaphore
in a consistent state. The result of the function passed torun
(or the result of theDeferred
it returns) becomes the result of theDeferred
returned byrun
.Another possible approach might be based on
DeferredQueue
andcooperate
.DeferredQueue
is mostly like a normal queue, but itsget
method returns aDeferred
. If there happen to be no items in the queue at the time of the call, theDeferred
will not fire until an item is added.Here's an example:
Note that the
async
worker function is the same as the one from the first example. However, this time, there's also aworker
function which is explicitly pulling jobs out of theDeferredQueue
and processing them withasync
(by addingasync
as a callback to theDeferred
returned byget
). Theworker
generator is driven bycooperate
, which iterates it once after eachDeferred
it yields fires. The main loop, then, starts three of these worker generators so that three jobs will be in progress at any given time.This approach involves a bit more code than the
DeferredSemaphore
approach, but has some benefits which may be interesting. First,cooperate
returns aCooperativeTask
instance which has useful methods likepause
,resume
, and a couple others. Also, all jobs assigned to the same cooperator will cooperate with each other in scheduling, so as not to overload the event loop (and this is what gives the API its name). On theDeferredQueue
side, it's also possible to set limits on how many items are pending processing, so you can avoid completely overloading your server (for example, if your image processors get stuck and stop completing tasks). If the code callingput
handles the queue overflow exception, you can use this as pressure to try to stop accepting new jobs (perhaps shunting them to another server, or alerting an administrator). Doing similar things withDeferredSemaphore
is a bit trickier, since there's no way to limit how many jobs are waiting to be able to acquire the semaphore.您可能还喜欢我写的 txRDQ(可调整大小的调度队列)。谷歌一下,它在 LaunchPad 上的 tx 集合中。抱歉,我没有更多时间回复 - 即将上台。
特里
You might also like the txRDQ (Resizable Dispatch Queue) I wrote. Google it, it's in the tx collection on LaunchPad. Sorry I don't have more time to reply - about to go onstage.
Terry