Python:如何检查 multiprocessing.Pool 中待处理任务的数量?

发布于 2024-10-30 07:17:36 字数 671 浏览 4 评论 0原文

我有一小部分工作人员 (4) 和一个非常大的任务列表 (5000~)。我正在使用一个池并使用 map_async() 发送任务。因为我正在运行的任务相当长,所以我强制将块大小设置为 1,以便一个长进程无法容纳一些较短的进程。

我想做的是定期检查还有多少任务需要提交。我知道最多有 4 个会处于活动状态,我关心的是还剩下多少个需要处理。

我用谷歌搜索了一下,找不到任何人这样做。

一些简单的代码可以提供帮助:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break

I have a small pool of workers (4) and a very large list of tasks (5000~). I'm using a pool and sending the tasks with map_async(). Because the task I'm running is fairly long, I'm forcing a chunksize of 1 so that one long process can't hold up some shorter ones.

What I'd like to do is periodically check how many tasks are left to be submitted. I know at most 4 will be active, I'm concerned with how many are left to process.

I've googled around and I can't find anybody doing this.

Some simple code to help:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

许仙没带伞 2024-11-06 07:17:36

看起来 jobs._number_left 就是您想要的。 _ 表示它是一个内部值,可能会根据开发人员的突发奇想而改变,但这似乎是获取该信息的唯一方法。

Looks like jobs._number_left is what you want. _ indicates that it is an internal value that may change at the whim of the developers, but it seems to be the only way to get that info.

究竟谁懂我的在乎 2024-11-06 07:17:36

假设您使用的是 apply_async,您可以通过查看 Pool._cache 属性来检查待处理作业的数量。这是存储 ApplyResult 的位置,直到它们可用并且等于待处理的 ApplyResult 数量。

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()

You can check the number of pending jobs by seeing Pool._cache attribute assuming that you are using apply_async. This is where ApplyResult is stored until they are available and equals to the number of ApplyResults pending.

import multiprocessing as mp
import random
import time


def job():
    time.sleep(random.randint(1,10))
    print("job finished")

if __name__ == '__main__':
    pool = mp.Pool(5)
    for _ in range(10):
        pool.apply_async(job)

    while pool._cache:
        print("number of jobs pending: ", len(pool._cache))
        time.sleep(2)

    pool.close()
    pool.join()
2024-11-06 07:17:36

据我所知,这不是一个完美的方法,但如果您使用 Pool.imap_unordered() 函数而不是 map_async,则可以拦截正在处理的元素。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

我正在减去 process_count,因为您几乎可以假设所有进程都将在以下两个例外之一的情况下进行处理:1)如果您使用迭代器,则可能没有更多的项目可供使用和处理和 2) 您剩余的物品可能少于 4 件。我没有为第一个异常编写代码。但如果您需要的话,这样做应该很容易。不管怎样,你的例子使用了一个列表,所以你不应该遇到这个问题。

编辑:我还意识到您正在使用 While 循环,这使得您看起来像是在尝试定期更新某些内容,例如每半秒或其他内容。我作为示例给出的代码不会这样做。我不确定这是否有问题。

No airtight way that I know of, but if you use the Pool.imap_unordered() function instead of map_async, you can intercept the elements that are processed.

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

I'm subtracting process_count, because you can pretty much assume that all processes will be processing with one of two exceptions: 1) if you use an iterator, there may not be further items left to consume and process, and 2) You may have fewer than 4 items left. I didn't code in for the first exception. But it should be pretty easy to do so if you need to. Anyway, your example uses a list so you shouldn't have that problem.

Edit: I also realized you're using a While loop, which makes it look like you're trying to update something periodically, say, every half second or something. The code I gave as an example will not do it that way. I'm not sure if that's a problem.

深居我梦 2024-11-06 07:17:36

我有类似的要求:跟踪进度,根据结果执行临时工作,随时干净地停止所有处理。我的处理方法是使用 apply_async 一次发送一个任务。我所做的工作的高度简化版本:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

请注意,我使用队列而不是返回结果。

I have similar requirements: track progress, perform interim work based on the results, stop all processing cleanly at any arbitrary time. How I've dealt with it is to send tasks one at a time with apply_async. A heavily simplified version of what I do:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

Note that I use a Queue instead of returning the results.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文