如何在不阻塞系统的情况下给出延迟的 AMP 回复?

发布于 2024-11-28 18:50:00 字数 1871 浏览 0 评论 0原文

(我非常乐意接受有关更好标题的建议。)

我正在使用 Twisted 上的 AMP 协议来创建一个调度程序,向其代理提供作业。代理从调度程序中提取作业,因此调度程序是 AMP 服务器,代理作为客户端进行连接。

这个想法是让代理进行连接,从(内部调度程序)作业队列的顶部拾取作业,然后继续执行它。但是,不能保证该队列始终非空。因此,我希望利用扭曲的延迟机制,以便在调度程序设法从队列中弹出作业时,在代理端简单地进行延迟触发。

不过,在调度程序端实现这一点被证明有点棘手。 AMP 的工作方式是为代理可以发送的每个(由我预定义的)命令分配一个函数,该函数获取该命令具有的所有参数并返回其返回的所有值的字典。这意味着我需要在一个函数内完成这一切。通常情况下,这不会是一个问题,但这里扭曲似乎妨碍了我:我需要让函数暂停一会儿,而不暂停扭曲事件循环,从而允许它实际上向队列添加更多作业,所以一个可以弹出。 (这就是我认为通常的 sleep() 不会达到预期效果的原因。)更重要的是,这意味着我想不出一种方法来使用一些扭曲的功能,例如 < code>deferToThread(),因为我必须在单独的函数中处理该结果(并且只能访问它们),我将其分配为该 deferred 的回调,所以我不知道要返回什么启动单独的线程并分配其回调后的 AMP 响应程序函数。这更清楚地说明了我的意思:(

def assignJob(agentID):
    # We expect the agentID, so we can store who we've given a job to.

    # Get a job without blocking even if the queue is originally empty.
    job = None
    while job is None:
        try:
            job = jobqueue.pop(0)
        except IndexError:
            # Imagine getJob simply tries to get a job every 5 seconds
            # (using sleep() safely because it's in a separate thread)
            # until it eventually gets one, which it returns
            d = deferToThread(getJob)

            # We would then need to have a separate function
            # , e.g. jobReturn() pick up the firing deferred and do
            # something with the result...
            d.addCallback(jobReturn)   

    # But if we do... We don't (necessarily) have a job to return here
    # because for all we know, the deferred from that thread hasn't even
    # fired yet.
    return {'job': ???}

这显然不是该函数的实际完整代码 - 首先,它是所需的 amp.AMP 子类的方法。)

reactor 方法callInThread() 起初似乎也很有用(因为它不返回延迟),但它没有提供一种方法来获取它执行的可调用函数的返回值(据我所知)可以看到)并且,即使确实如此,这意味着等待线程完成,这会阻塞此方法很长时间,这使得使用单独的线程毫无意义。

那么,在我找到工作之前,如何阻止此方法,而不是整个 Twisted 事件循环,或者如何在其立即响应程序方法之外返回 AMP 回复?

(I'm very open to suggestions for a better title.)

I am using the AMP protocol over Twisted to create a scheduler that feeds out jobs to its agents. The agents pull jobs from the scheduler, so the scheduler is an AMP server and the agents connect as clients.

The idea is for an agent to connect, pick up a job from the top of the (internal scheduler) job queue and then go on about its way executing it. However, that queue is not guaranteed to be always non-empty. Thus, I am looking to take advantage of the twisted deferred mechanic in order to simply have a deferred fire on the agent's side when the scheduler has managed to pop off a job from the queue.

Implementing this on the scheduler side is proving a bit tricky, though. The way that AMP works is by assigning a function to each (predefined by me) command that the agent can send, with the function taking all the arguments that the command has and returning a dictionary of all the values it returns. This means that I need to do this all from within one function. Normally, this would not be an issue, but here twisted appears to get in my way: I need to have the function pause for a bit, without pausing the twisted event loop thus allowing it to actually add more jobs to the queue, so one can be popped off. (This is the reason I don't think the usual sleep() will have the desired effect.) More importantly, it means I can't think of a way to use some twisted functionality, e.g. deferToThread(), because I would have to handle the results from that (and only have access to them) in the separate function that I would assign as that deferred's callback, so I wouldn't know what to return in the AMP responder function after launching off the separate thread and assigning its callback. This illustrates what I mean a bit more clearly:

def assignJob(agentID):
    # We expect the agentID, so we can store who we've given a job to.

    # Get a job without blocking even if the queue is originally empty.
    job = None
    while job is None:
        try:
            job = jobqueue.pop(0)
        except IndexError:
            # Imagine getJob simply tries to get a job every 5 seconds
            # (using sleep() safely because it's in a separate thread)
            # until it eventually gets one, which it returns
            d = deferToThread(getJob)

            # We would then need to have a separate function
            # , e.g. jobReturn() pick up the firing deferred and do
            # something with the result...
            d.addCallback(jobReturn)   

    # But if we do... We don't (necessarily) have a job to return here
    # because for all we know, the deferred from that thread hasn't even
    # fired yet.
    return {'job': ???}

(This is obviously not the actual full code for the function - for one, it's a method to a subclass of amp.AMP as required.)

The reactor method callInThread() also seems useful at first (since it doesn't return a deferred), but it doesn't offer a way to get the return value of the callable that it executes (as far as I can see) and, even if it did, that would mean waiting for the thread to finish, which would block this method for as long, which makes using a separate thread pointless.

So how do I block this method until I have a job, but not the whole Twisted event loop or, alternatively, how do I return an AMP reply outside of its immediate responder method?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

债姬 2024-12-05 18:50:00

您可能错过的一件事是,AMP 响应程序方法本身也允许返回 Deferred(在 AMP API 文档)。只要 Deferred 最终使用与命令的响应定义相匹配的字典来触发,一切都会正常工作。

也有些相关,如果您想避免使用线程,您可能需要查看 twisted.internet.defer.DeferredQueue,一种原生了解 Deferred 的队列数据结构。

One thing you might have missed is that an AMP responder method itself is also allowed to return a Deferred (search for may also return Deferreds in the AMP API docs). As long as the Deferred eventually fires with a dictionary which matches up with the command's response definition, everything will work fine.

Also somewhat related, if you want to avoid using threads, you might want to take a look at twisted.internet.defer.DeferredQueue, a queue data structure which knows about Deferreds natively.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文