使用 django 和 kombu 流式传输视图

发布于 2024-11-14 21:23:04 字数 713 浏览 3 评论 0原文

我在昆布有两个队列;一个用于提交请求(做某事),另一个通过发布/订阅吐出所述请求的增量状态。因此在我的过程中它将发布到请求队列并在响应队列上消费。由于该任务可能需要一些时间,我想向用户提供有关后端发生的情况的反馈;这一切都在命令行上运行,因为我的 kombu consume 回调允许我添加一个 logging.info() 语句来向我的用户返回信息:

def callback( msg, env ):
    logging.info( str(msg) )

consumer.register_callback( callback )
consumer.consume()
while continue_consuming:
    connection.drain_events()

但是,我现在希望能够在 django 中提供相同的功能。我知道我可以创建一个 generator 函数作为 HttpResponse 对象的输入:

def view( reqeust ):
    HttpResponse( gen() )

def gen():
    yield 'streaming... '

但我无法概念化如何将 kombu 队列的消息回调实现到生成器中提供这个...有什么想法吗?

如果可能的话,我想避免使用数据库层来存储进度/结果。

i have two queues in kombu; one to submit requests (to do something) and another that spits out via pub/sub the incremental status of said request. therefore in my process it will publish to the request queue and consume on the response queue. as the task may take some time, i want to provide the user with feedback as to what is happening in the backend; it all works on the command line, as my kombu consume callback allows me to, say, add a logging.info() statement to spit back information to my user:

def callback( msg, env ):
    logging.info( str(msg) )

consumer.register_callback( callback )
consumer.consume()
while continue_consuming:
    connection.drain_events()

however, i now want to be able to provide the same functionality in django. i understand that i can create a generator function as the input to a HttpResponse object:

def view( reqeust ):
    HttpResponse( gen() )

def gen():
    yield 'streaming... '

but i cannot conceptualize how i could implement the message callback of the kombu queue into a generator to provide this... any ideas?

i want to avoid having to use a database layer to store the progress/results if possible.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

柠檬心 2024-11-21 21:23:04

最后我决定稍微重构一下代码;因为我有一个昆布队列的包装器,以使接口更像 multiprocess.Queue ,所以我为我的 get() 方法创建了一个生成器。

def get( self, until=None ):
    if until == None:
        until = self.end_marker
    for c in count():
        m = self.consumer.queues[0].get( True )
        if not m == None:
            if m.payload == until:
                raise StopIteration
            yield m.payload

这似乎工作正常 - 但并不是那么干净,因为我需要知道 self.end_marker 或 util 是什么,并且还可能想要迭代所有消费者队列(但我的类无论如何都是每个对象队列,所以这还不错)

那么我在我看来所做的就是:

 def view( response ):
     q = Queue()
     return HttpResponse( q.get() )

有很多关于各种中间件的帖子;我只是懒得使用它们,而且看起来效果很好。

in the end i decided to restructure the code a bit; as i had a wrapper around a kombu queue to make the interface more multiprocess.Queue like, i created a generator for my get() method.

def get( self, until=None ):
    if until == None:
        until = self.end_marker
    for c in count():
        m = self.consumer.queues[0].get( True )
        if not m == None:
            if m.payload == until:
                raise StopIteration
            yield m.payload

this appears to work fine - but not all that clean as i have a need to know what self.end_marker or util is, and also might want to iterate through all the consumer queues (but my class is queue per object anyway, so that's not too bad)

then all i do in my view is:

 def view( response ):
     q = Queue()
     return HttpResponse( q.get() )

there's numerous posts about various middleware getting in the way; i just don't bother using them and it seems to work fine.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文