在python服务器中接收、处理和发送多个流

发布于 2025-01-16 06:29:32 字数 3702 浏览 3 评论 0原文

我有一个 React 应用程序,它使用 Janus WebRTC 服务器在客户端之间共享视频流(使用 videoroom 插件)。可以有可变数量的房间,每个房间中可以有可变数量的客户。

我还有一个 python 服务器,它使用 GStreamer 和 OpenCV 接收视频流(只有一个从 Janus 服务器 rtp 转发的流)并使用 YOLO 神经网络处理其帧(这工作正常)。

编辑:我当前使用 Janus 发送的 rtp 转发请求如下所示:

rtpForward (room: number, publisher_id: number): void {
    try {
      const message = {
        request: 'rtp_forward',
        room,
        publisher_id,
        host: '192.my.local.ip',
        host_family: 'ipv4', 
        video_port: 6001, 
        videopt: 100,
      }
      this.videoRoomPlugin &&
        this.videoRoomPlugin.send({
          message,
          success: result => {
            console.log('::: videoPlugin - RTP forwarding', result)
          },
          error: error => {
            console.error('::: videoPlugin - RTP forwarding ERROR:', error)
          }
        })
    } catch (error) {
      console.log('Error rtp forwarding: ', error)
    }
  }

我在 python 服务器中用于获取 rtp 转发的代码如下所示:

import numpy
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import Gst, GstApp, GLib
from threading import Thread
import cv2
import torch
from av import VideoFrame

Gst.init()

# Precictor to make the object detection on the frames
predictor = torch.hub.load('ultralytics/yolov5', 'yolov5s')  # yolov5s or yolov5m, yolov5x, custom
frame_count = 0
frame_threshold = 5
# Global variable holding the image array
image_arr = None

def gst_to_opencv(sample):
    buf = sample.get_buffer()
    caps = sample.get_caps()

    arr = numpy.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width'),
         3),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=numpy.uint8)
    return arr


main_loop = GLib.MainLoop()
main_loop_thread = Thread(target=main_loop.run)
main_loop_thread.start()

pipeline = Gst.parse_launch('udpsrc port=6001 caps="application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)VP8, payload=(int)96" ! queue ! rtpvp8depay ! decodebin ! videoconvert ! video/x-raw, format=BGR ! appsink name=sink sync=true max-buffers=1 drop=true')
appsink = pipeline.get_by_name("sink")
pipeline.set_state(Gst.State.PLAYING)

try:
    while True:
        sample = appsink.try_pull_sample(Gst.SECOND)

        if sample is None:
            continue

        # Convert the sample to a numpy array to manage with cv2
        frame = gst_to_opencv(sample)

        # Determine whether its a frame where a detection should be performed
        if (frame_count < frame_threshold):
            # No detection -> do nothing
            frame_count += 1
            new_frame = sample
        else:
            # Detection -> Apply predictor to frame and rendet the results on it
            image = frame
            outputs = predictor(image) # Apply predictor
            outputs.render() # Render predictor results on the frame

            # Prepare output in the new_frame variable
            new_frame = VideoFrame.from_ndarray(outputs.imgs[0], format="bgr24")
            frame_count = 0
except KeyboardInterrupt:
    pass

pipeline.set_state(Gst.State.NULL)
main_loop.quit()
main_loop_thread.join()

现在我想要进行下一步并处理多个客户端流,另外,发送处理后的流返回到相应的客户端。

设想的解决方案:

首先,我考虑使用 python 服务器加入 Janus 房间来订阅它们的提要,但这需要处理 Janus 的多个会话并使每个人都成为发布者和订阅者。

其次,我考虑保留 Janus 部分以在客户端之间共享提要,并将流从客户端直接发送到 python 服务器(以及直接从服务器发送回)。

由于这看起来是一项相当复杂的任务,我问:你认为我的选择有意义吗?您认为其中哪一个更好?

我觉得当前 Janus rtp 转发的设置是不正确的,因为我认为当流到达 python 服务器时,如果它们来自 Janus 这样的流,我无法识别它们。直接在客户端和 python 服务器之间共享提要是否更好?这可以通过网络套接字来实现吗?

提前感谢您的帮助!

I have a React application which uses a Janus WebRTC server to share video streams between clients (using the videoroom plugin). There can be a variable number of rooms with a variable number of clients in each room.

I also have a python server that uses GStreamer and OpenCV to receive a video stream (just one stream rtp-forwarded from the Janus server) and process its frames with a YOLO neural network (this works fine).

Edit: The rtp forward request I currently send with Janus looks like this:

rtpForward (room: number, publisher_id: number): void {
    try {
      const message = {
        request: 'rtp_forward',
        room,
        publisher_id,
        host: '192.my.local.ip',
        host_family: 'ipv4', 
        video_port: 6001, 
        videopt: 100,
      }
      this.videoRoomPlugin &&
        this.videoRoomPlugin.send({
          message,
          success: result => {
            console.log('::: videoPlugin - RTP forwarding', result)
          },
          error: error => {
            console.error('::: videoPlugin - RTP forwarding ERROR:', error)
          }
        })
    } catch (error) {
      console.log('Error rtp forwarding: ', error)
    }
  }

The code I use in the python server to get the rtp-forward looks like this:

import numpy
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstApp', '1.0')
from gi.repository import Gst, GstApp, GLib
from threading import Thread
import cv2
import torch
from av import VideoFrame

Gst.init()

# Precictor to make the object detection on the frames
predictor = torch.hub.load('ultralytics/yolov5', 'yolov5s')  # yolov5s or yolov5m, yolov5x, custom
frame_count = 0
frame_threshold = 5
# Global variable holding the image array
image_arr = None

def gst_to_opencv(sample):
    buf = sample.get_buffer()
    caps = sample.get_caps()

    arr = numpy.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width'),
         3),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=numpy.uint8)
    return arr


main_loop = GLib.MainLoop()
main_loop_thread = Thread(target=main_loop.run)
main_loop_thread.start()

pipeline = Gst.parse_launch('udpsrc port=6001 caps="application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)VP8, payload=(int)96" ! queue ! rtpvp8depay ! decodebin ! videoconvert ! video/x-raw, format=BGR ! appsink name=sink sync=true max-buffers=1 drop=true')
appsink = pipeline.get_by_name("sink")
pipeline.set_state(Gst.State.PLAYING)

try:
    while True:
        sample = appsink.try_pull_sample(Gst.SECOND)

        if sample is None:
            continue

        # Convert the sample to a numpy array to manage with cv2
        frame = gst_to_opencv(sample)

        # Determine whether its a frame where a detection should be performed
        if (frame_count < frame_threshold):
            # No detection -> do nothing
            frame_count += 1
            new_frame = sample
        else:
            # Detection -> Apply predictor to frame and rendet the results on it
            image = frame
            outputs = predictor(image) # Apply predictor
            outputs.render() # Render predictor results on the frame

            # Prepare output in the new_frame variable
            new_frame = VideoFrame.from_ndarray(outputs.imgs[0], format="bgr24")
            frame_count = 0
except KeyboardInterrupt:
    pass

pipeline.set_state(Gst.State.NULL)
main_loop.quit()
main_loop_thread.join()

Now I want to make the next step and handle multiple client streams, plus, send the processed streams back to the corresponding clients.

Contemplated solutions:

First, I thought about joining the Janus rooms with the python server to subscribe to their feeds, but this would require to handle multiple sessions of Janus and making everyone publishers and subscribers.

Second, I thought about keeping the Janus part to share feeds between clients and also send the streams directly to the python server from the clients (as well as sending them back from the server directly).

So as this looks like a quite complicated task, I ask: do you think that my options make sense? Which one of them do you think is better?

I feel like the current setup with the Janus rtp-forwarding is incorrect, as I think that I have no way of identifying the streams when they reach the python server if they come from Janus like that. Is it better to share the feeds between the clients and the python server directly? This could be achieved with web sockets?

Thank you for your help in advance!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文