如何在fastapi中使用asyncio.semapahore?

发布于 2025-02-07 10:47:07 字数 1222 浏览 3 评论 0原文

我正在为图像分类模型构建FastAPI服务器。 API接收图像URL并使用HTTPX同时下载它们。我想使用asyncio.semaphores限制服务器上的并发下载次数。我尝试在所有请求中共享一个分类器属性作为分类器属性。但是我会收到错误:获得了未来的未来待处理,从/usr/lib/python3.9/asyncio/base/base_events.py:424>附加到其他循环

这是我的代码:

 
# classifier.py

import asyncio
import httpx


class Classifier():
    def __init__(
        self,
        concurrency_limit,
    ) -> None:
        self.client = httpx.AsyncClient()
        self.semaphore = asyncio.Semaphore(concurrency_limit)

    async def download_async(self, url):
        async with self.semaphore:
            response = await self.client.get(url)

        return await response.aread()

    async def run(
        self, image_urls
    ):
        image_list = await asyncio.gather(
            *[self.download_async(url) for i, url in enumerate(image_urls)]
        )

        # Infer Images
        pass



# api.py
from fastapi import FastAPI

server = FastAPI()

classifier = Classifier(concurrency_limit=5)

@server.post("/")
async def index(urls):
    results = await classifier.run(urls)

在线上提及在运行事件循环中创建信号的答案,但是如果我在run()中创建它,为每个请求创建一个新的信号量。即,每个请求允许同时下载5个。如何在所有请求中使用相同的信号量?

I am building a FastAPI server for an image classification model. The API takes in image urls and downloads them concurrently using httpx. I want to limit the number of concurrent downloads across the server using asyncio.Semaphores. I tried creating a sempahore as a classifier attribute as I need it shared among all requests. But I get the error: got Future <Future pending created at /usr/lib/python3.9/asyncio/base_events.py:424> attached to a different loop.

Here is my code:

 
# classifier.py

import asyncio
import httpx


class Classifier():
    def __init__(
        self,
        concurrency_limit,
    ) -> None:
        self.client = httpx.AsyncClient()
        self.semaphore = asyncio.Semaphore(concurrency_limit)

    async def download_async(self, url):
        async with self.semaphore:
            response = await self.client.get(url)

        return await response.aread()

    async def run(
        self, image_urls
    ):
        image_list = await asyncio.gather(
            *[self.download_async(url) for i, url in enumerate(image_urls)]
        )

        # Infer Images
        pass



# api.py
from fastapi import FastAPI

server = FastAPI()

classifier = Classifier(concurrency_limit=5)

@server.post("/")
async def index(urls):
    results = await classifier.run(urls)

Answers online mention creating a semaphore within a running Event loop, but if I create it within run() a new semaphore is created for every request. i.e each request allows 5 downloads concurrently. How do I use the same semaphore across all requests?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文