在 fastapi 应用程序中使用来自 kafka 主题的消息并将其发送给特定客户端

发布于 2025-01-09 16:15:17 字数 546 浏览 1 评论 0原文

我拥有的。

一个FastAPI应用程序,由两个子应用程序组成,并安装了特定路径。其中之一是 websocket 应用程序。 具有两个主题的单节点 Kafka 集群。一个用于 websocket 应用程序写入,另一个用于 websocket 应用程序读取。 一个使用 socket.io 客户端连接到 FastAPI 应用程序的 React 应用程序。特别是 websocket 子应用程序。

我想要实现的目标

我可以从 websocket 子应用程序写入 Kafka 主题,因为它很简单。每个连接到该应用程序的用户都会获得一个套接字。我将每个套接字 ID 映射到用户标识符并存储它以供以后使用。生产者生成消息后,另一个应用程序会对数据进行一些其他处理,然后写入一个主题,我必须从中读取消息。本质上是一个消费者。在消息中,我有一个用户标识符,可以使用它来检测套接字 ID。

现在,我不确定如何以及何时启动 FastAPI 应用程序的消费者。它需要是一个独立的模块吗?如果是,我如何将响应发送回原始 websocket 客户端。 [我存储了套接字 ID。]

What I have.

A FastAPI app which consists of two sub apps and are mounted and specific paths. One of them is a websocket app.
A single node Kafka cluster with two topics. One to which the websocket app writes and other from which the websocket app reads.
A react application that uses socket.io client to make connection to the FastAPI app. Specifically the websocket sub app.

What I am trying to achieve

I am able to write to the Kafka topic from the websocket sub app as it is straightforward. Every user that connects to the app is given a socket it. I am mapping every socket id to the user identifier and storing it for later use. After the producer produces the message there is some other processing that is done on the data by another app and then it writes to a topic from which I have to read the messages. Basically a consumer. In the message I have a user identifier using which I can detect the socket id.

Now, I am unsure as to how to and when to start my consumer with respect to a FastAPI application. Does it need to be a standalone module? If yes, how do I send back the response to the original websocket client. [I have socket id stored.]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

戏蝶舞 2025-01-16 16:15:17

首先,这是一个常见问题,因为您实际上已经在轮询循环中获得了消费者,然后一旦获得事件就进行发布,这肯定会影响依赖于负载的事件循环的性能。
但是,假设您使用了部分权利,那么这是可行的。

在此示例中,您可以重新工具AIOKafka 消费者需要通过套接字进行发布。您必须维护套接字 ID 到记录 ID 的映射(使用 redis)。

但这仍然给单个快速 API 工作线程(假设gunicon)添加了很多东西,并且您需要非常仔细地观察消费者组和分区,以确保消息真正到达正确的用户。

就我个人而言,我会将其与一个单独的系统解耦,以处理向客户端的实时发送,或者使用 托管 websocket pub-sub 系统,或使用类似 faust-streaming

免责声明:我写了链接博客,在 Ably 工作。

Firstly this is a common issue, because you've effectively got a consumer in a polling loop that is then publishing once it gets an event, can definitely impact the performance of the event-loop dependant on load.
However, it's doable, assuming you use the rights bits and pieces.

In this example, you could re-tool the AIOKafka consumer to publish over the socket required. You'll have to maintain a mapping of socket ID to record ID (use redis).

BUT this is still adding a lot of things to a single fast API worker (assuming gunicon) and you'd need to watch consumer groups and partitions very carefully to make sure messages actually get to the right user.

Personally, I'd decouple this with a separate system to handle the realtime sending to the client, either with a hosted websocket pub-sub system, or using something like faust-streaming

Disclaimer: I wrote the linked blog, working at Ably.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文