Python多处理进度方法

发布于 2025-01-22 03:54:58 字数 2639 浏览 2 评论 0 原文

我一直在忙着编写我的第一个多处理代码,是的,是的。 但是,现在我想要一些进度的反馈,我不确定最好的方法是什么。

  • ​流程完成一个文件,它发送了一条“完成”消息。
  • 主代码保留了我读了有关队列,池,tqdm的各种信息的数量完成的
Core 0  processing file 20 of 317 ||||||____ 60% completed
Core 1  processing file 21 of 317 |||||||||_ 90% completed
...
Core 7  processing file 18 of 317 ||________ 20% completed

我不确定要走哪种方式。谁能指出在这种情况下可以使用的方法?

提前致谢!

编辑:更改了我的代码,该代码启动了GSB22

我的代码的建议:

# file operations
import os
import glob
# Multiprocessing
from multiprocessing import Process
# Motion detection
import cv2


# >>> Enter directory to scan as target directory
targetDirectory = "E:\Projects\Programming\Python\OpenCV\\videofiles"

def get_videofiles(target_directory):

    # Find all video files in directory and subdirectories and put them in a list
    videofiles = glob.glob(target_directory + '/**/*.mp4', recursive=True)
    # Return the list
    return videofiles


def process_file(videofile):

    '''
    What happens inside this function:
    - The video is processed and analysed using openCV
    - The result (an image) is saved to the results folder
    - Once this function receives the videofile it completes
      without the need to return anything to the main program
    '''

    # The processing code is more complex than this code below, this is just a test
    cap = cv2.VideoCapture(videofile)

    for i in range(10):
        succes, frame = cap.read()

        # cv2.imwrite('{}/_Results/{}_result{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)

        if succes:
            try:
                cv2.imwrite('{}/_Results/{}_result_{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
            except:
                print('something went wrong')


if __name__ == "__main__":

    # Create directory to save results if it doesn't exist
    if not os.path.exists(targetDirectory + '/_Results'):
        os.makedirs(targetDirectory + '/_Results')

    # Get a list of all video files in the target directory
    all_files = get_videofiles(targetDirectory)

    print(f'{len(all_files)} video files found')

    # Create list of jobs (processes)
    jobs = []

    # Create and start processes
    for file in all_files:
        proc = Process(target=process_file, args=(file,))
        jobs.append(proc)

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()

    # TODO: Print some form of progress feedback

    print('Finished :)')

I've been busy writing my first multiprocessing code and it works, yay.
However, now I would like some feedback of the progress and I'm not sure what the best approach would be.

What my code (see below) does in short:

  • A target directory is scanned for mp4 files
  • Each file is analysed by a separate process, the process saves a result (an image)

What I'm looking for could be:

  1. Simple
  • Each time a process finishes a file it sends a 'finished' message
  • The main code keeps count of how many files have finished
  1. Fancy
Core 0  processing file 20 of 317 ||||||____ 60% completed
Core 1  processing file 21 of 317 |||||||||_ 90% completed
...
Core 7  processing file 18 of 317 ||________ 20% completed

I read all kinds of info about queues, pools, tqdm and I'm not sure which way to go. Could anyone point to an approach that would work in this case?

Thanks in advance!

EDIT: Changed my code that starts the processes as suggested by gsb22

My code:

# file operations
import os
import glob
# Multiprocessing
from multiprocessing import Process
# Motion detection
import cv2


# >>> Enter directory to scan as target directory
targetDirectory = "E:\Projects\Programming\Python\OpenCV\\videofiles"

def get_videofiles(target_directory):

    # Find all video files in directory and subdirectories and put them in a list
    videofiles = glob.glob(target_directory + '/**/*.mp4', recursive=True)
    # Return the list
    return videofiles


def process_file(videofile):

    '''
    What happens inside this function:
    - The video is processed and analysed using openCV
    - The result (an image) is saved to the results folder
    - Once this function receives the videofile it completes
      without the need to return anything to the main program
    '''

    # The processing code is more complex than this code below, this is just a test
    cap = cv2.VideoCapture(videofile)

    for i in range(10):
        succes, frame = cap.read()

        # cv2.imwrite('{}/_Results/{}_result{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)

        if succes:
            try:
                cv2.imwrite('{}/_Results/{}_result_{}.jpg'.format(targetDirectory, os.path.basename(videofile), i), frame)
            except:
                print('something went wrong')


if __name__ == "__main__":

    # Create directory to save results if it doesn't exist
    if not os.path.exists(targetDirectory + '/_Results'):
        os.makedirs(targetDirectory + '/_Results')

    # Get a list of all video files in the target directory
    all_files = get_videofiles(targetDirectory)

    print(f'{len(all_files)} video files found')

    # Create list of jobs (processes)
    jobs = []

    # Create and start processes
    for file in all_files:
        proc = Process(target=process_file, args=(file,))
        jobs.append(proc)

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()

    # TODO: Print some form of progress feedback

    print('Finished :)')

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

ζ澈沫 2025-01-29 03:54:58

我阅读了有关队列,池,TQDM的各种信息,我不确定该走哪种方式。谁能指出在这种情况下可以使用的方法?

这是一种以最低成本获取进度指示的非常简单的方法:

from multiprocessing.pool import Pool
from random import randint
from time import sleep

from tqdm import tqdm


def process(fn) -> bool:
    sleep(randint(1, 3))
    return randint(0, 100) < 70


files = [f"file-{i}.mp4" for i in range(20)]

success = []
failed = []
NPROC = 5
pool = Pool(NPROC)


for status, fn in tqdm(zip(pool.imap(process, files), files), total=len(files)):
    if status:
        success.append(fn)
    else:
        failed.append(fn)

print(f"{len(success)} succeeded and {len(failed)} failed")

一些评论:

  • TQDM是一个第三方库,非常好的实现了Progressbars。还有其他。 PIP安装TQDM
  • 我们使用一个池(几乎没有理由为这样的简单事物管理流程), nproc 流程。我们让池在输入数据上处理我们的过程函数。
  • 我们通过使函数返回布尔值来发出信号(在此示例中,我们随机选择加权以取得成功)。我们不会返回文件名,尽管我们可以,因为必须将其从子过程中序列化并发送,这是不必要的开销。
  • 我们使用 pool.imap ,它返回一个迭代器,该迭代器保持与我们传递的含义相同的顺序。因此我们可以使用 zip 迭代 files >直接。由于我们使用尺寸未知的迭代器,因此需要告诉 tqdm 它是多长时间的。 (我们本可以使用 pool.map ,但是没有必要提交RAM--尽管对于一个布尔来说,这可能没有什么区别。)

我故意将其写成一种食谱。您只需使用范式中的高级下降和池。

参考


https://tqdm.github.io/

I read all kinds of info about queues, pools, tqdm and I'm not sure which way to go. Could anyone point to an approach that would work in this case?

Here's a very simple way to get progress indication at minimal cost:

from multiprocessing.pool import Pool
from random import randint
from time import sleep

from tqdm import tqdm


def process(fn) -> bool:
    sleep(randint(1, 3))
    return randint(0, 100) < 70


files = [f"file-{i}.mp4" for i in range(20)]

success = []
failed = []
NPROC = 5
pool = Pool(NPROC)


for status, fn in tqdm(zip(pool.imap(process, files), files), total=len(files)):
    if status:
        success.append(fn)
    else:
        failed.append(fn)

print(f"{len(success)} succeeded and {len(failed)} failed")

Some comments:

  • tqdm is a 3rd-party library which implements progressbars extremely well. There are others. pip install tqdm.
  • we use a pool (there's almost never a reason to manage processes yourself for simple things like this) of NPROC processes. We let the pool handle iterating our process function over the input data.
  • we signal state by having the function return a boolean (in this example we choose randomly, weighting in favour of success). We don't return the filename, although we could, because it would have to be serialised and sent from the subprocess, and that's unnecessary overhead.
  • we use Pool.imap, which returns an iterator which keeps the same order as the iterable we pass in. So we can use zip to iterate files directly. Since we use an iterator with unknown size, tqdm needs to be told how long it is. (We could have used pool.map, but there's no need to commit the ram---although for one bool it probably makes no difference.)

I've deliberately written this as a kind of recipe. You can do a lot with multiprocessing just by using the high-level drop in paradigms, and Pool.[i]map is one of the most useful.

References

https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool
https://tqdm.github.io/

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文