FastAPI:拒绝带有 HTTP 响应的 WebSocket 连接

发布于 2025-01-09 13:11:24 字数 606 浏览 1 评论 0原文

在基于 FastAPI 的 Web 应用程序中,我有一个 WebSocket 端点,仅当满足某些条件时才应允许连接,否则它应返回 HTTP 404 回复,而不是使用 HTTP 101< 升级连接/代码>。

据我了解,协议完全支持这一点,但我找不到任何方法可以使用 FastAPI 或 Starlette 来做到这一点。

如果我有类似的情况:

@router.websocket("/foo")
async def ws_foo(request: WebSocket):
    if _user_is_allowed(request):
        await request.accept()
        _handle_ws_connection(request)
    else:
        raise HTTPException(status_code=404)

异常不会转换为 404 响应,因为 FastAPI 的 ExceptionMiddleware 似乎无法处理此类情况。

是否有任何本机/内置方式支持这种“拒绝”流程?

In a FastAPI based Web app, I have a WebSocket endpoint that should allow connections only if some conditions are met, otherwise it should return an HTTP 404 reply instead of upgrading the connection with HTTP 101.

As far as I understand, this is fully supported by the protocol, But I couldn't find any way to do it with FastAPI or Starlette.

If I have something like:

@router.websocket("/foo")
async def ws_foo(request: WebSocket):
    if _user_is_allowed(request):
        await request.accept()
        _handle_ws_connection(request)
    else:
        raise HTTPException(status_code=404)

The exception isn't converted to a 404 response, as FastAPI's ExceptionMiddleware doesn't seem to handle such cases.

Is there any native / built-in way of supporting this kind of "reject" flow?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

呢古 2025-01-16 13:11:24

握手完成后,协议将从 HTTP 更改为 WebSocket< /代码>。如果您尝试在 websocket 端点内引发 HTTP 异常,您会发现这是不可能的,或者返回 HTTP 响应(例如,return JSONResponse( ...status_code=404)),您将收到内部服务器错误,即 ASGI 可调用已返回但未发送握手

选项 1

因此,如果您希望在协议升级之前拥有某种检查机制,则需要使用 中间件,如下所示。在中间件内部,您不能引发异常,但可以返回响应(即 ResponseJSONResponsePlainTextResponse 等) ,这实际上就是 FastAPI 处理异常的方式这场景。作为参考,请查看这篇帖子,以及讨论此处

async def is_user_allowed(request: Request):
    # if conditions are not met, return False
    print(request['headers'])
    print(request.client)
    return False

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    if not await is_user_allowed(request):
        return JSONResponse(content={"message": "User not allowed"}, status_code=404)
    response = await call_next(request)
    return response

或者,如果您愿意,您可以使用 is_user_allowed() 方法引发需要使用 try- except 块捕获的自定义异常:

class UserException(Exception):
    def __init__(self, message):
        self.message = message
        super().__init__(message)

async def is_user_allowed(request: Request):
    # if conditions are not met, raise UserException
    raise UserException(message="User not allowed.")
    
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    try:
        await is_user_allowed(request)
    except UserException as e:
        return JSONResponse(content={"message": f'{e.message}'}, status_code=404)
    response = await call_next(request)
    return response

选项 2

但是,如果您需要使用 websocket 实例来执行此操作,您可以具有与上面相同的逻辑,但是,在 is_user_allowed() 中传递 websocket 实例> 方法,以及捕获 websocket 端点内的异常(受到 this 的启发)。

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        await is_user_allowed(ws)
        await handle_conn(ws)
    except UserException as e:
        await ws.send_text(e.message) # optionally send a message to the client before closing the connection
        await ws.close()

但是,在上面的情况下,您必须首先接受连接,以便在引发异常时可以调用 close() 方法来终止连接。如果您愿意,可以使用如下所示的内容。但是,except 块中的 return 语句将引发内部服务器错误(即,ASGI 可调用返回而不发送握手。),如上所述早些时候。

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    try:
        await is_user_allowed(ws)
    except UserException as e:
        return
    await ws.accept()
    await handle_conn(ws)

Once a handshake is completed, the protocol changes from HTTP to WebSocket. If you attempted to raise an HTTP exception inside the websocket endpoint, you would see that this is not possible, or return an HTTP response (e.g., return JSONResponse(...status_code=404)), you would get an internal server error, i.e., ASGI callable returned without sending handshake.

Option 1

Thus, if you would like to have some kind of checking mechanism before the protocol is upgraded, you would need to use a Middleware, as shown below. Inside the middleware, you can't raise an exception, but you can return a response (i.e., Response, JSONResponse, PlainTextResponse, etc), which is actually how FastAPI handles exceptions behind the scenes. As a reference, please have a look at this post, as well as the discussion here.

async def is_user_allowed(request: Request):
    # if conditions are not met, return False
    print(request['headers'])
    print(request.client)
    return False

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    if not await is_user_allowed(request):
        return JSONResponse(content={"message": "User not allowed"}, status_code=404)
    response = await call_next(request)
    return response

or, if you prefer, you can have is_user_allowed() method raising a custom exception that you need to catch with a try-except block:

class UserException(Exception):
    def __init__(self, message):
        self.message = message
        super().__init__(message)

async def is_user_allowed(request: Request):
    # if conditions are not met, raise UserException
    raise UserException(message="User not allowed.")
    
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    try:
        await is_user_allowed(request)
    except UserException as e:
        return JSONResponse(content={"message": f'{e.message}'}, status_code=404)
    response = await call_next(request)
    return response

Option 2

If, however, you need to do that using the websocket instance, you could have the same logic as above, but, instead, pass the websocket instance in is_user_allowed() method, and catch the exception inside the websocket endpoint (inspired by this).

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    try:
        await is_user_allowed(ws)
        await handle_conn(ws)
    except UserException as e:
        await ws.send_text(e.message) # optionally send a message to the client before closing the connection
        await ws.close()

In the above, however, you would have to accept the connection first, so that you can call the close() method to terminate the connection, if exception is raised. If you prefer, you could use something like the below. However, that return statement insdie the except block would throw an internal server error (i.e., ASGI callable returned without sending handshake.), as described earlier.

@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    try:
        await is_user_allowed(ws)
    except UserException as e:
        return
    await ws.accept()
    await handle_conn(ws)
凝望流年 2025-01-16 13:11:24

我有同样的问题。在某些情况下,您希望使用常规 HTTP 标头回复传入的 Websocket 升级请求。例如,有时特殊的标头用于在工作平衡器等中配置网络路由。

我花了一些时间才弄清楚,不同协议的处理程序似乎与 FastAPI 一起生活在更基本的级别上。对路由的不同协议的处理似乎是从 ASGI 服务器(例如 uvicorn)传播到 starlette 到 FastAPI 。因此,就我而言,解决方案是简单地使用 --ws nonews="none" 禁用 uvicorn 中的 Websocket 协议。因此,我使用 .get() 处理程序的常规路由会收到 websocket 升级请求,并可以使用 HTTP 标头进行回复。

这对于我的应用程序来说没问题,其中 FastAPI 充当网关,不需要实现 Websocket 本身。不幸的是,我不知道这个技术堆栈有处理混合情况的解决方案。

I had the same issue. In some circumstances, you want to reply to an incoming Websocket upgrade request with a regular HTTP header. For instance, sometimes special headers are used to configure network routing in work balancers etc.

I took me some time to figure out, that the handlers for the different protocols seem to live on a more basic level with FastAPI. The handling of different protocols for routes seem to be propagated from the ASGI server (e.g. uvicorn) to starlette to FastAPI. So in my case, the solution was to simply disable the Websocket protocol in uvicorn using --ws none or ws="none"). As a result, my regular route with .get() handler receives the websocket upgrade request and can reply with a HTTP header.

This is ok for my application, where FastAPI acts as a gateway and does not need to implement Websocket itself. Unfortunately, I don't know of a solution with this tech stack which handles mixed cases.

祁梦 2025-01-16 13:11:24

我设法在 HTTP 异常上使用 exception_handler,并在 websocket 尚未打开时直接向 ASGI 发送“websocket.http.response.start”信号。

(刚刚在 uvicorn==0.27.0 和 fastapi==0.103.1 上测试)

from fastapi import FastAPI, HTTPException
from fastapi.websockets import WebSocket
from fastapi.exception_handlers import http_exception_handler

app = FastAPI()

@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request, exc):
    if not isinstance(request, WebSocket):
        return await http_exception_handler(request, exc)
    websocket = request
    if websocket.application_state == 1:  # 1 = CONNECTED
        await websocket.close(code=1008, reason=str(exc))  # 1008 = WS_1008_POLICY_VIOLATION
        return
    headers = getattr(exc, "headers") or []
    await websocket._send({"type": "websocket.http.response.start", "status": exc.status_code, "headers": headers})

I managed to do using with a exception_handler on HTTP exceptions, and sending "websocket.http.response.start" signal directly to ASGI if websocket is not open yet.

(just tested on uvicorn==0.27.0 and fastapi==0.103.1)

from fastapi import FastAPI, HTTPException
from fastapi.websockets import WebSocket
from fastapi.exception_handlers import http_exception_handler

app = FastAPI()

@app.exception_handler(HTTPException)
async def custom_http_exception_handler(request, exc):
    if not isinstance(request, WebSocket):
        return await http_exception_handler(request, exc)
    websocket = request
    if websocket.application_state == 1:  # 1 = CONNECTED
        await websocket.close(code=1008, reason=str(exc))  # 1008 = WS_1008_POLICY_VIOLATION
        return
    headers = getattr(exc, "headers") or []
    await websocket._send({"type": "websocket.http.response.start", "status": exc.status_code, "headers": headers})
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文