fastapi+ apscheduler不同步
我正在尝试设置一个fastapi应用程序进行以下操作:
- 接受消息作为发布请求,并将其列入队列;
- 背景作业不时从队列中拉出消息(最多批量的大小),将其处理成批处理,然后将其存储在字典中;
- 该应用程序正在从字典中检索结果,并将其发送回“完成”。
为此,我已经通过apscheduler通过队列进行通信设置了一个背景作业-incoming-requests-in-batches-b384a1406ec" rel="nofollow noreferrer">https://levelup.gitconnected.com/fastapi-how-to-process-incoming-requests-in-batches-b384a1406ec.这是我应用程序的代码:
import queue
import uuid
from asyncio import sleep
import uvicorn
from pydantic import BaseModel
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
app = FastAPI()
app.input_queue = queue.Queue()
app.output_dict = {}
app.queue_limit = 2
def upper_messages():
for i in range(app.queue_limit):
try:
obj = app.input_queue.get_nowait()
app.output_dict[obj['request_id']] = obj['text'].upper()
except queue.Empty:
pass
app.scheduler = AsyncIOScheduler()
app.scheduler.add_job(upper_messages, 'interval', seconds=5)
app.scheduler.start()
async def get_result(request_id):
while True:
if request_id in app.output_dict:
result = app.output_dict[request_id]
del app.output_dict[request_id]
return result
await sleep(0.001)
class Payload(BaseModel):
text: str
@app.post('/upper')
async def upper(payload: Payload):
request_id = str(uuid.uuid4())
app.input_queue.put({'text': payload.text, 'request_id': request_id})
return await get_result(request_id)
if __name__ == "__main__":
uvicorn.run(app)
但是它并没有真正异步运行;如果我调用以下测试脚本:
from time import time
import requests
texts = [
'text1',
'text2',
'text3',
'text4'
]
time_start = time()
for text in texts:
result = requests.post('http://127.0.0.1:8000/upper', json={'text': text})
print(result.text, time() - time_start)
消息确实会处理,但是整个处理需要15-20秒,输出是类似的:
"TEXT1" 2.961090087890625
"TEXT2" 7.96642279624939
"TEXT3" 12.962305784225464
"TEXT4" 17.96261429786682
我期望整个处理时间为5-10秒(少于5秒后前两条消息应处理,另外两条或多或少恰好在5秒后)。相反,在处理第一个消息之前,似乎没有将第二个消息放在队列中 - 即就像我只是在使用单个线程一样。
问题:
- 有人知道如何修改上面的代码,以便在收到它们后立即将所有传入的消息立即发送给队列?
- [奖励问题1]:以上内容是通过
uvicorn debug_app:app
从命令行中运行脚本(例如debug_app.py
),将保持真实。但是,如果我使用python3 debug_app.py
运行它,根本没有返回消息。收到消息(执行CTRL+C会导致等待连接关闭。(Ctrl+C强制退出)
),但从未处理过。 - [奖励问题2]:我不明白的另一件事是为什么,如果我删除
等待睡眠(0.001)
get_result
内部,则行为甚至可以得到更糟糕的是:无论我做什么,应用程序都冻结,我都无法终止它(即Ctrl+C也不是kill
工作),我必须发送一个Sigkill(kill -9
)停止它。
背景 如果您想知道我为什么这样做,例如上面链接的博客文章,目的是进行有效的深度学习推断。我(大致)同一时间处理一个或十二个请求的模型,因此批处理可以大大增加吞吐量。我首先尝试设置FastApi Frontend + RabbitMQ + Blask Baskend Pipeline,它起作用了,但是复杂的设置的开销(和/或我无法与之合作)使开销比刚刚计算的时间更重。模型,使收益无效...因此,我首先试图获得一个简约的版本来工作。在此玩具示例中的upper_messages
方法将成为模型的直接调用(如果此计算 - 估算步骤不会过多阻止传入的连接),或者是对实际执行计算的另一个过程的异步调用 - 我'稍后再看...
I am trying to set up a fastAPI app doing the following:
- Accept messages as post requests and put them in a queue;
- A background job is, from time to time, pulling messages (up to a certain batch size) from the queue, processing them in a batch, and storing results in a dictionary;
- The app is retrieving results from the dictionary and sending them back "as soon as" they are done.
To do so, I've set up a background job with apscheduler communicating via a queue trying to make a simplified version of this post: https://levelup.gitconnected.com/fastapi-how-to-process-incoming-requests-in-batches-b384a1406ec. Here is the code of my app:
import queue
import uuid
from asyncio import sleep
import uvicorn
from pydantic import BaseModel
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
app = FastAPI()
app.input_queue = queue.Queue()
app.output_dict = {}
app.queue_limit = 2
def upper_messages():
for i in range(app.queue_limit):
try:
obj = app.input_queue.get_nowait()
app.output_dict[obj['request_id']] = obj['text'].upper()
except queue.Empty:
pass
app.scheduler = AsyncIOScheduler()
app.scheduler.add_job(upper_messages, 'interval', seconds=5)
app.scheduler.start()
async def get_result(request_id):
while True:
if request_id in app.output_dict:
result = app.output_dict[request_id]
del app.output_dict[request_id]
return result
await sleep(0.001)
class Payload(BaseModel):
text: str
@app.post('/upper')
async def upper(payload: Payload):
request_id = str(uuid.uuid4())
app.input_queue.put({'text': payload.text, 'request_id': request_id})
return await get_result(request_id)
if __name__ == "__main__":
uvicorn.run(app)
however it's not really running asynchronously; if I invoke the following test script:
from time import time
import requests
texts = [
'text1',
'text2',
'text3',
'text4'
]
time_start = time()
for text in texts:
result = requests.post('http://127.0.0.1:8000/upper', json={'text': text})
print(result.text, time() - time_start)
the messages do get processed, but the whole processing takes 15-20 seconds, the output being something like:
"TEXT1" 2.961090087890625
"TEXT2" 7.96642279624939
"TEXT3" 12.962305784225464
"TEXT4" 17.96261429786682
I was instead expecting the whole processing to take 5-10 seconds (after less than 5 seconds the first two messages should be processed, and the other two more or less exactly 5 seconds later). It seems instead that the second message is not being put to the queue until the first one is processed - i.e. the same as if I were just using a single thread.
Questions:
- Does anyone know how to modify the code above so that all the incoming messages are put to the queue immediately upon receiving them?
- [bonus question 1]: The above holds true if I run the script (say,
debug_app.py
) from the command line viauvicorn debug_app:app
. But if I run it withpython3 debug_app.py
no message is returned at all. Messages are received (doing CTRL+C results inWaiting for connections to close. (CTRL+C to force quit)
) but never processed. - [bonus question 2]: Another thing I don't understand is why, if I remove the line
await sleep(0.001)
inside the definition ofget_result
, the behaviour gets even worse: no matter what I do, the app freezes, I cannot terminate it (i.e. neither CTRL+C norkill
work), I have to send a sigkill (kill -9
) to stop it.
Background
If you are wondering why I am doing this, like in the blog post linked above, the purpose is to do efficient deep learning inference. The model I have takes (roughly) the same time processing one or a dozen requests at the same time, so batching can dramatically increase throughput. I first tried setting up a fastAPI frontend + RabbitMQ + Flask backend pipeline, and it worked, but the overhead of the complicated setup (and/or my inability of working with it) made the overhead heavier than the time it just took to compute the model, nullifying the gain... so I'm first trying to get a minimalistic version to work. The upper_messages
method in this toy example will become either directly invocation of the model (if this computational-heavier step is not blocking incoming connections too much) or an async call to another process actually doing the computations - I'll see about that later...
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
...在研究更好的情况之后,看起来该应用程序确实在我想要的,我的错误是我对其进行测试的方式...
的确,当将POST请求发送到Uvicorn Server时,客户端是剩下的等待答案来了 - 这是预期的行为。当然,这也意味着在收集第一个答案之前,不会发送下一个请求。因此,服务器没有批处理它们,因为没有什么可批量的!
要正确测试此操作,我将
test.py
脚本稍有更改为:并通过多个进程运行:
输出如预期,并在此中处理了成对的消息(来自不同用户!)一批(确切的顺序是随机的,尽管同一用户当然会按照其提出的请求顺序获得答案):
我将问题打开(并且不接受我自己的答案),因为上面的“奖励问题”(关于申请变得冻结)我仍然没有答案。
... after looking better into it, it looks like the application was indeed working as I wanted it to, my error was in the way I tested it...
Indeed, when sending a POST request to the uvicorn server, the client is left waiting for an answer to come - which is intended behaviour. Of course, this also means, however, is that the next request is not sent until the first answer is collected. So the server is not batching them because there's nothing to batch!
To test this correctly, I slightly altered the
test.py
script to:And run it in multiple processes via:
The output is now as expected, with pairs of messages (from different users!) being processed in a batch (and the exact order is a bit randomized, although the same user gets, of course, answers in the order of the requests it made):
I'm leaving the question open (and not accepting my own answer) because for the "bonus questions" above (about the application becoming frozen) I still don't have an answer.