将视频和数据流同时实时映射到一个子流程管道

发布于 2025-01-13 19:38:22 字数 1357 浏览 0 评论 0原文

我需要在 OpenCV/Python 中同时实时处理视频流和 klvdata 流。我正在使用 FFMPEG 读取文件或流,因为 OpenCV 不保留 klvdata。我使用 subprocess 模块将数据传递给 OpenCV。

我的问题是我不知道如何将视频和 klvdata 同时映射到同一个子进程管道?

我的代码:

#!/usr/bin/env python3
import sys, json, klvdata;
from subprocess import PIPE
import subprocess as sp
import cv2
import numpy

command = ['ffmpeg',
    '-i', 'DayFlight.mpg',
    '-map', '0:0',
    '-map', '0:d',        
    '-pix_fmt', 'bgr24',
    '-c:v', 'rawvideo',      
    '-an','-sn',              
    '-f', 'image2pipe', '-',
    '-c:d', 'copy',
    '-f','data',
    ]

pipe = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=10**8)

while True:
   raw_image = pipe.stdout.read(1280*720*3)
   image =  numpy.fromstring(raw_image, dtype='uint8')
   image = image.reshape((720,1280,3))          
   if image is not None:
      cv2.imshow('Video', image)
   if cv2.waitKey(1) & 0xFF == ord('q'):
      break
   for packet in klvdata.StreamParser(pipe.stdout): 
      metadata = packet.MetadataList()
      print(metadata)
pipe.stdout.flush()
cv2.destroyAllWindows()

产生以下错误:

Traceback (most recent call last):
  File "test_cv.py", line 32, in <module>
    metadata = packet.MetadataList()
AttributeError: 'UnknownElement' object has no attribute 'MetadataList'

非常感谢任何帮助。

I need to process the video stream and the klvdata streams simultaneously in real-time in OpenCV/Python. I'm using FFMPEG to read the file or stream as OpenCV does not retain the klvdata. I pass the data to OpenCV with the subprocess module.

My problem is I cannot figure out how to map both the video and klvdata to the same subprocess pipe simultaneously?

My code:

#!/usr/bin/env python3
import sys, json, klvdata;
from subprocess import PIPE
import subprocess as sp
import cv2
import numpy

command = ['ffmpeg',
    '-i', 'DayFlight.mpg',
    '-map', '0:0',
    '-map', '0:d',        
    '-pix_fmt', 'bgr24',
    '-c:v', 'rawvideo',      
    '-an','-sn',              
    '-f', 'image2pipe', '-',
    '-c:d', 'copy',
    '-f','data',
    ]

pipe = sp.Popen(command, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, bufsize=10**8)

while True:
   raw_image = pipe.stdout.read(1280*720*3)
   image =  numpy.fromstring(raw_image, dtype='uint8')
   image = image.reshape((720,1280,3))          
   if image is not None:
      cv2.imshow('Video', image)
   if cv2.waitKey(1) & 0xFF == ord('q'):
      break
   for packet in klvdata.StreamParser(pipe.stdout): 
      metadata = packet.MetadataList()
      print(metadata)
pipe.stdout.flush()
cv2.destroyAllWindows()

Produces the below error:

Traceback (most recent call last):
  File "test_cv.py", line 32, in <module>
    metadata = packet.MetadataList()
AttributeError: 'UnknownElement' object has no attribute 'MetadataList'

Any help is greatly appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

为了分割视频和数据,我们可以将视频流映射到stderr管道,并将KLV数据流映射到stdout管道。

在我的以下答案中,使用了相同的技术来分离视频和音频。

当每个视频帧都有私有KLV数据(按顺序同步)时,视频帧和相应数据之间的精确同步相对简单。

Day Flight.mpg 示例文件的数据包要少得多比帧,并且使用建议的解决方案不可能实现精确的同步(我认为使用管道方法不可能)。
我们仍然可以应用一些粗略同步 - 假设数据和帧是在时间接近的情况下读取的。

分割视频和数据的建议方法:

                                            -----------
                                       --->| Raw Video | ---> stderr (pipe)
 -----------        -------------     |     -----------    
| Input     |      | FFmpeg      |    |
| Video with| ---> | sub-process | ---      
| Data      |      |             |    |    
 -----------        -------------     |     -----------
                                       --->| KLV data  | ---> stdout (pipe)
                                            -----------

视频和数据在两个单独的线程中读取:

  • 视频读取器线程 - 读取原始视频帧(BGR)格式。
  • 数据读取器线程 - 读取并解析 KLV 数据。

根据 Wikipedia,KLV 格式没有明确定义:

密钥长度可以是 1、2、4 或 16 字节。
想必在单独的规范文档中,您会就给定应用程序的密钥长度达成一致。

在示例视频中,密钥长度为 16 字节,但不能保证...


从 stdout 管道读取 KLV 数据:
当从管道读取数据时(以类似实时的方式),我们需要知道预期读取的字节数。
这迫使我们对 KLV 数据进行部分解析:

  • 读取“密钥”(假设长度为 16 字节)。
  • 阅读“长度”——“BER 长度”标准存在一些挑战。
  • 读取“数据”(读取的大小由长度定义)。

读取完密钥、长度和数据后,我们就有了一个“KLV数据包”,我们可以发送到KLV数据解析器


以下是与 Day Flight.mpg 示例输入文件配合使用的代码示例:

#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO

# Video reader thread.
def video_reader(pipe):
    cols, rows = 1280, 720  # Assume we know frame size is 1280x720

    counter = 0
    while True:
        raw_image = pipe.read(cols*rows*3)  # Read raw video frame

        # Break the loop when length is too small
        if len(raw_image) < cols*rows*3:
            break

        if (counter % 60) == 0:
            # Show video frame evey 60 frames
            image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
            cv2.imshow('Video', image) # Show video image for testing
            cv2.waitKey(1)
        counter += 1



# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
    """Return integer given bytes."""
    return int.from_bytes(bytes(value), byteorder='big', signed=signed)


# Data reader thread (read KLV data).
def data_reader(pipe):
    key_length = 16  # Assume key length is 16 bytes.

    f = open('data.bin', 'wb')  # For testing - store the KLV data to data.bin (binary file)

    while True:
        # https://en.wikipedia.org/wiki/KLV
        # The first few bytes are the Key, much like a key in a standard hash table data structure.
        # Keys can be 1, 2, 4, or 16 bytes in length.
        # Presumably in a separate specification document you would agree on a key length for a given application.
        key = pipe.read(key_length)  # Read the key
        
        if len(key) < key_length:
            break  # Break the loop when length is too small
        f.write(key)  # Write data to binary file for testing

        # https://github.com/paretech/klvdata/tree/master/klvdata
        # Length field
        len_byte = pipe.read(1)

        if len(len_byte) < 1:
            break  # Break the loop when length is too small
        f.write(len_byte)  # Write data to binary file for testing

        byte_length = bytes_to_int(len_byte)

        # https://github.com/paretech/klvdata/tree/master/klvdata                                                
        if byte_length < 128:
            # BER Short Form
            length = byte_length
            ber_len_bytes = b''
        else:
            # BER Long Form
            ber_len = byte_length - 128
            ber_len_bytes = pipe.read(ber_len)

            if len(ber_len_bytes) < ber_len:
                break  # Break the loop when length is too small
            f.write(ber_len_bytes)  # Write ber_len_bytes to binary file for testing

            length = bytes_to_int(ber_len_bytes)

        # Read the value (length bytes)
        value = pipe.read(length)
        if len(value) < length:
            break  # Break the loop when length is too small
        f.write(value)  # Write data to binary file for testing

        klv_data = key + len_byte + ber_len_bytes + value  # Concatenate key length and data
        klv_data_as_bytes_io = BytesIO(klv_data)  # Wrap klv_data with BytesIO (before parsing)

        # Parse the KLV data
        for packet in klvdata.StreamParser(klv_data_as_bytes_io): 
            metadata = packet.MetadataList()
            print(metadata)
            print() # New line

# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet '                        # Set loglevel to quiet for disabling the prints ot stderr
                               '-i "Day Flight.mpg" '                                        # Input video "Day Flight.mpg"
                               '-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
                               '-map 0:d -c copy -copy_unknown -f:d data pipe:1 '            # Copy the data without ddecoding.
                               '-report'),                                                   # Create a log file (because we can't the statuses that are usually printed to stderr).
                                stdout=sp.PIPE, stderr=sp.PIPE)


# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))
video_thread.start()

# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))
data_thread.start()


# Wait for threads (and process) to finish.
video_thread.join()
data_thread.join()
process.wait()

上面的代码将数据保存到 data.bin(用于测试)。
data.bin 可用于一致性检查。
执行 FFmpeg CLI 提取数据流:

ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin

验证 raw.bindata.bin 文件是否相同。

For splitting the video and data, we may map the video stream to stderr pipe and map the KLV data stream to to stdout pipe.

The same technique is used for separating video and audio in my following answer.

Accurate synchronization between the the video frame and the corresponding data is relatively simple when each video frame has private KLV data (synchronize by sequential order).

The Day Flight.mpg sample file has much fewer data packets than frames, and accurate synchronization is not possible using the suggested solution (I don't think it is possible using the pipes approach).
We may still apply some coarse synchronization - assume the data and the frame are read in time proximity.

Suggested way for splitting the video and the data:

                                            -----------
                                       --->| Raw Video | ---> stderr (pipe)
 -----------        -------------     |     -----------    
| Input     |      | FFmpeg      |    |
| Video with| ---> | sub-process | ---      
| Data      |      |             |    |    
 -----------        -------------     |     -----------
                                       --->| KLV data  | ---> stdout (pipe)
                                            -----------

The video and the data are read in two separate threads:

  • Video reader thread - read raw video frames (in BGR) format.
  • Data reader thread - read and parse KLV data.

According to Wikipedia, the KLV format is not well defined:

Keys can be 1, 2, 4, or 16 bytes in length.
Presumably in a separate specification document you would agree on a key length for a given application.

In the sample video, the key length is 16 bytes, but it is not guaranteed...


Reading the KLV data from stdout pipe:
When reading data from a pipe (in real-time like manner), we need to know the expected number of bytes to read.
That forces us to do partial parsing of the KLV data:

  • Read the "key" (assume 16 bytes length).
  • Read the "length" - there is some challenge with "BER length" standard.
  • Read the the "data" (size to read is defined by the length).

After reading the key, length and data, we have one "KLV data packet", we can send to the KLV data parser.


Here is a code sample that woks with Day Flight.mpg sample input file:

#!/usr/bin/env python3
import klvdata
import subprocess as sp
import shlex
import threading
import numpy as np
import cv2
from io import BytesIO

# Video reader thread.
def video_reader(pipe):
    cols, rows = 1280, 720  # Assume we know frame size is 1280x720

    counter = 0
    while True:
        raw_image = pipe.read(cols*rows*3)  # Read raw video frame

        # Break the loop when length is too small
        if len(raw_image) < cols*rows*3:
            break

        if (counter % 60) == 0:
            # Show video frame evey 60 frames
            image = np.frombuffer(raw_image, np.uint8).reshape([rows, cols, 3])
            cv2.imshow('Video', image) # Show video image for testing
            cv2.waitKey(1)
        counter += 1



# https://github.com/paretech/klvdata/tree/master/klvdata
def bytes_to_int(value, signed=False):
    """Return integer given bytes."""
    return int.from_bytes(bytes(value), byteorder='big', signed=signed)


# Data reader thread (read KLV data).
def data_reader(pipe):
    key_length = 16  # Assume key length is 16 bytes.

    f = open('data.bin', 'wb')  # For testing - store the KLV data to data.bin (binary file)

    while True:
        # https://en.wikipedia.org/wiki/KLV
        # The first few bytes are the Key, much like a key in a standard hash table data structure.
        # Keys can be 1, 2, 4, or 16 bytes in length.
        # Presumably in a separate specification document you would agree on a key length for a given application.
        key = pipe.read(key_length)  # Read the key
        
        if len(key) < key_length:
            break  # Break the loop when length is too small
        f.write(key)  # Write data to binary file for testing

        # https://github.com/paretech/klvdata/tree/master/klvdata
        # Length field
        len_byte = pipe.read(1)

        if len(len_byte) < 1:
            break  # Break the loop when length is too small
        f.write(len_byte)  # Write data to binary file for testing

        byte_length = bytes_to_int(len_byte)

        # https://github.com/paretech/klvdata/tree/master/klvdata                                                
        if byte_length < 128:
            # BER Short Form
            length = byte_length
            ber_len_bytes = b''
        else:
            # BER Long Form
            ber_len = byte_length - 128
            ber_len_bytes = pipe.read(ber_len)

            if len(ber_len_bytes) < ber_len:
                break  # Break the loop when length is too small
            f.write(ber_len_bytes)  # Write ber_len_bytes to binary file for testing

            length = bytes_to_int(ber_len_bytes)

        # Read the value (length bytes)
        value = pipe.read(length)
        if len(value) < length:
            break  # Break the loop when length is too small
        f.write(value)  # Write data to binary file for testing

        klv_data = key + len_byte + ber_len_bytes + value  # Concatenate key length and data
        klv_data_as_bytes_io = BytesIO(klv_data)  # Wrap klv_data with BytesIO (before parsing)

        # Parse the KLV data
        for packet in klvdata.StreamParser(klv_data_as_bytes_io): 
            metadata = packet.MetadataList()
            print(metadata)
            print() # New line

# Execute FFmpeg as sub-process
# Map the video to stderr and map the data to stdout
process = sp.Popen(shlex.split('ffmpeg -hide_banner -loglevel quiet '                        # Set loglevel to quiet for disabling the prints ot stderr
                               '-i "Day Flight.mpg" '                                        # Input video "Day Flight.mpg"
                               '-map 0:v -c:v rawvideo -pix_fmt bgr24 -f:v rawvideo pipe:2 ' # rawvideo format is mapped to stderr pipe (raw video codec with bgr24 pixel format)
                               '-map 0:d -c copy -copy_unknown -f:d data pipe:1 '            # Copy the data without ddecoding.
                               '-report'),                                                   # Create a log file (because we can't the statuses that are usually printed to stderr).
                                stdout=sp.PIPE, stderr=sp.PIPE)


# Start video reader thread (pass stderr pipe as argument).
video_thread = threading.Thread(target=video_reader, args=(process.stderr,))
video_thread.start()

# Start data reader thread (pass stdout pipe as argument).
data_thread = threading.Thread(target=data_reader, args=(process.stdout,))
data_thread.start()


# Wait for threads (and process) to finish.
video_thread.join()
data_thread.join()
process.wait()

The above code saves the data to data.bin (for testing).
data.bin may be used for consistency check.
Execute FFmpeg CLI for extracting the data stream:

ffmpeg -y -i "Day Flight.mpg" -map 0:d -c copy -copy_unknown -f data raw.bin

Verify that raw.bin and data.bin files are equal.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文