如何在fastapi中使用asyncio.semapahore?
我正在为图像分类模型构建FastAPI服务器。 API接收图像URL并使用HTTPX同时下载它们。我想使用asyncio.semaphores限制服务器上的并发下载次数。我尝试在所有请求中共享一个分类器属性作为分类器属性。但是我会收到错误:获得了未来的未来待处理,从/usr/lib/python3.9/asyncio/base/base_events.py:424>附加到其他循环
。
这是我的代码:
# classifier.py
import asyncio
import httpx
class Classifier():
def __init__(
self,
concurrency_limit,
) -> None:
self.client = httpx.AsyncClient()
self.semaphore = asyncio.Semaphore(concurrency_limit)
async def download_async(self, url):
async with self.semaphore:
response = await self.client.get(url)
return await response.aread()
async def run(
self, image_urls
):
image_list = await asyncio.gather(
*[self.download_async(url) for i, url in enumerate(image_urls)]
)
# Infer Images
pass
# api.py
from fastapi import FastAPI
server = FastAPI()
classifier = Classifier(concurrency_limit=5)
@server.post("/")
async def index(urls):
results = await classifier.run(urls)
在线上提及在运行事件循环中创建信号的答案,但是如果我在run()
中创建它,为每个请求创建一个新的信号量。即,每个请求允许同时下载5个。如何在所有请求中使用相同的信号量?
I am building a FastAPI server for an image classification model. The API takes in image urls and downloads them concurrently using httpx. I want to limit the number of concurrent downloads across the server using asyncio.Semaphores. I tried creating a sempahore as a classifier attribute as I need it shared among all requests. But I get the error: got Future <Future pending created at /usr/lib/python3.9/asyncio/base_events.py:424> attached to a different loop
.
Here is my code:
# classifier.py
import asyncio
import httpx
class Classifier():
def __init__(
self,
concurrency_limit,
) -> None:
self.client = httpx.AsyncClient()
self.semaphore = asyncio.Semaphore(concurrency_limit)
async def download_async(self, url):
async with self.semaphore:
response = await self.client.get(url)
return await response.aread()
async def run(
self, image_urls
):
image_list = await asyncio.gather(
*[self.download_async(url) for i, url in enumerate(image_urls)]
)
# Infer Images
pass
# api.py
from fastapi import FastAPI
server = FastAPI()
classifier = Classifier(concurrency_limit=5)
@server.post("/")
async def index(urls):
results = await classifier.run(urls)
Answers online mention creating a semaphore within a running Event loop, but if I create it within run()
a new semaphore is created for every request. i.e each request allows 5 downloads concurrently. How do I use the same semaphore across all requests?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论