夸脱和psycopg3 asyncconnection Notifies() - > SSE多个客户

发布于 2025-01-25 16:39:11 字数 2351 浏览 5 评论 0原文

我正在尝试创建一个SSE Server Quart应用程序,该应用程序会从PostgreSQL实例中聆听通知(然后将SSE消息发送到注册EventsOrces)。

https://pgjones.gitlab.io/quart/tutororials/ Broadcast_Tutorial.html 工作,在此工作,

@app.route('/sse')
async def sse():
    async def send_events():
        while True:
            await asyncio.sleep(60)
            event = ServerSentEvent('heartbeat')
            yield event.encode()

同时为一个以上的客户(TAB)工作(宣布该端口js中的Eventsource。)。完整的示例将帖子后的消息发送到其他路线,尽管这不是我的目标。

从逻辑上讲,应用程序应该连接到PostgreSQL实例并收听通知,然后订阅/SSE的这些Eventsource应定期接收SSE消息。

读取 https://wwwwwww.psycopg.org/psycopg3/psycopg3/docs/docs/ddocs/advanced/advanced/ashync.html? 如果同时同时,请尝试

@app.before_serving
async def initialize():
    app.db_conn = await psycopg.AsyncConnection.connect(connectionstring, autocommit=True)
    curs = app.db_conn.cursor()
    await curs.execute("LISTEN ow_insert_event")

一次

@app.route('/sse')
async def sse():
    async def send_events():
        while True:
            gen = app.db_conn.notifies()
            async for notify in gen:
                event = ServerSentEvent(notify)
                yield event.encode()

对一个客户使用一次;第二个是继续聆听的,即使应用程序收到了新的通知,第一个也不会再收到消息。如果只有一个客户端不断起作用,则有人会通知()发电机和一个消费者所述发电机。

有了两个客户,创建了两个发电机,但其中只有一个发电机似乎抓住了通知,也许是意图的?我在psycopg3文档中还没有看到有关相同连接的几个通知()async发电机(几个同步生成器和循环阻止的障碍)的任何内容,

如果我让the fort for the fore the the the the the the the the the async Generator with notifies() AT /SSE使用相同的生成器,然后问题是 - 校正 - 与众不同:“ RuntimeError:andext():Asynchronous Generator已经在运行”。进一步的阅读表明,不可能并行消耗同样的异步生成器: httpps://twistedmatrix.com /trac/ticket/9805

是否会收到任何明显错误的亮点。

如果只有一个与PsyCopG3连接的DB连接的通知()async Generator,则 /SSE路由无法直接消耗此发电机。 (“在发电机中通知的async:”在 @app.before_serving中无法正常工作,因为它阻止了应用程序启动)

I am trying to create an SSE server Quart application that listens to notifications from a postgresql instance (and then sends an SSE message to registered EventSources).

Have gotten a simplified version of https://pgjones.gitlab.io/quart/tutorials/broadcast_tutorial.html to work, where

@app.route('/sse')
async def sse():
    async def send_events():
        while True:
            await asyncio.sleep(60)
            event = ServerSentEvent('heartbeat')
            yield event.encode()

works for more than one client (tab) at the same time (declaring that route the EventSource in the client js.) . The complete example sends messages after a post to a different route though which isn't my goal.

Logically the app should connect once to the postgresql instance and listen for notifications and then those EventSources subscribed to /sse should periodically receive SSE messages.

Read https://www.psycopg.org/psycopg3/docs/advanced/async.html?highlight=async and tried

@app.before_serving
async def initialize():
    app.db_conn = await psycopg.AsyncConnection.connect(connectionstring, autocommit=True)
    curs = app.db_conn.cursor()
    await curs.execute("LISTEN ow_insert_event")

together with

@app.route('/sse')
async def sse():
    async def send_events():
        while True:
            gen = app.db_conn.notifies()
            async for notify in gen:
                event = ServerSentEvent(notify)
                yield event.encode()

which works for one client once if two clients simultaneously; the second keeps listening and the first receives no more messages even if new notifications are received by the app. If there's just one client however it works continually, one notifies() generator and one consumer of said generator.

With two clients two generators are created but only one of them seems to catch notify's, maybe this is intended? I haven't seen anything in the psycopg3 docs about several notifies() async generators for the same connection (several sync generators and blocking for loops make less sense)

If I let before_serving create the async generator with .notifies() so that all clients at /sse use the same generator then the issue is -- correction -- not the same: "RuntimeError: anext(): asynchronous generator is already running" happens then. Further reading suggests it isn't possible to consume the same async generator in parallel: https://twistedmatrix.com/trac/ticket/9805

Would appreciate if could receive highlights of any obvious errors.

If there's only meant to be one notifies() async generator for one db connection with psycopg3 then the /sse route cannot consume this generator directly. (The "async for notify in generator:" doesn't work well in @app.before_serving as it prevents the app from starting)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文