Python中的并行递归函数

发布于 2024-12-02 02:24:27 字数 662 浏览 1 评论 0原文

如何在 Python 中并行化递归函数?

我的函数如下所示:

def f(x, depth):
    if x==0:
        return ...
    else :
        return [x] + map(lambda x:f(x, depth-1), list_of_values(x))

def list_of_values(x):
    # Heavy compute, pure function

当尝试使用 multiprocessing.Pool.map 并行化它时,Windows 打开无限数量的进程并挂起。

并行化(对于单个多核机器)的好(最好是简单)方法是什么?

这是挂起的代码:

from multiprocessing import Pool
pool = pool(processes=4)
def f(x, depth):
    if x==0:
        return ...
    else :
        return [x] + pool.map(lambda x:f(x, depth-1), list_of_values(x))

def list_of_values(x):
    # Heavy compute, pure function

How do I parallelize a recursive function in Python?

My function looks like this:

def f(x, depth):
    if x==0:
        return ...
    else :
        return [x] + map(lambda x:f(x, depth-1), list_of_values(x))

def list_of_values(x):
    # Heavy compute, pure function

When trying to parallelize it with multiprocessing.Pool.map, Windows opens an infinite number of processes and hangs.

What's a good (preferably simple) way to parallelize it (for a single multicore machine)?

Here is the code that hangs:

from multiprocessing import Pool
pool = pool(processes=4)
def f(x, depth):
    if x==0:
        return ...
    else :
        return [x] + pool.map(lambda x:f(x, depth-1), list_of_values(x))

def list_of_values(x):
    # Heavy compute, pure function

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

走野 2024-12-09 02:24:27

好的,很抱歉出现这样的问题。

我将回答一个稍微不同的问题,其中 f() 返回列表中值的总和。这是因为从您的示例中我不清楚 f() 的返回类型是什么,并且使用整数使代码易于理解。

这很复杂,因为有两个不同的事情并行发生:

  1. 池中昂贵函数的计算
  2. f() 的递归扩展

我非常小心,只使用池来计算昂贵函数。这样我们就不会出现进程的“爆炸”,但由于这是异步的,我们需要推迟工作人员在完成昂贵的函数后调用的回调的大量工作。

更重要的是,我们需要使用倒计时锁存器,以便我们知道对 f() 的所有单独子调用何时完成。

可能有一种更简单的方法(我很确定有,但我需要做其他事情),但这也许可以让您了解什么是可能的:

from multiprocessing import Pool, Value, RawArray, RLock
from time import sleep

class Latch:

    '''A countdown latch that lets us wait for a job of "n" parts'''

    def __init__(self, n):
        self.__counter = Value('i', n)
        self.__lock = RLock()

    def decrement(self):
        with self.__lock:
            self.__counter.value -= 1
            print('dec', self.read())
        return self.read() == 0

    def read(self):
        with self.__lock:
            return self.__counter.value

    def join(self):
        while self.read():
            sleep(1)


def list_of_values(x):
    '''An expensive function'''
    print(x, ': thinking...')
    sleep(1)
    print(x, ': thought')
    return list(range(x))


pool = Pool()


def async_f(x, on_complete=None):
    '''Return the sum of the values in the expensive list'''
    if x == 0:
        on_complete(0) # no list, return 0
    else:
        n = x # need to know size of result beforehand
        latch = Latch(n) # wait for n entires to be calculated
        result = RawArray('i', n+1) # where we will assemble the map
        def delayed_map(values):
            '''This is the callback for the pool async process - it runs
               in a separate thread within this process once the
               expensive list has been calculated and orchestrates the
               mapping of f over the result.'''
            result[0] = x # first value in list is x
            for (v, i) in enumerate(values):
                def callback(fx, i=i):
                    '''This is the callback passed to f() and is called when
                       the function completes.  If it is the last of all the
                       calls in the map then it calls on_complete() (ie another
                       instance of this function) for the calling f().'''
                    result[i+1] = fx
                    if latch.decrement(): # have completed list
                        # at this point result contains [x]+map(f, ...)
                        on_complete(sum(result)) # so return sum
                async_f(v, callback)
        # Ask worker to generate list then call delayed_map
        pool.apply_async(list_of_values, [x], callback=delayed_map)


def run():
    '''Tie into the same mechanism as above, for the final value.'''
    result = Value('i')
    latch = Latch(1)
    def final_callback(value):
        result.value = value
        latch.decrement()
    async_f(6, final_callback)
    latch.join() # wait for everything to complete
    return result.value


print(run())

PS:我正在使用Python 3.2,上面的丑陋是因为我们将最终结果的计算(返回树)推迟到稍后。发电机或期货之类的东西可能会简化事情。

另外,我怀疑您需要一个缓存,以避免在使用与之前相同的参数调用时不必要地重新计算昂贵的函数。

另请参阅 yaniv 的答案 - 这似乎是反转的另一种方法通过明确深度来确定评估的顺序。

OK, sorry for the problems with this.

I'm going to answer a slightly different question where f() returns the sum of the values in the list. That is because it's not clear to me from your example what the return type of f() would be, and using an integer makes the code simple to understand.

This is complex because there are two different things happening in parallel:

  1. the calculation of the expensive function in the pool
  2. the recursive expansion of f()

I am very careful to only use the pool to calculate the expensive function. In that way we don't get an "explosion" of processes, but because this is asynchronous we need to postpone a lot of work for the callback that the worker calls once the expensive function is done.

More than that, we need to use a countdown latch so that we know when all the separate sub-calls to f() are complete.

There may be a simpler way (I am pretty sure there is, but I need to do other things), but perhaps this gives you an idea of what is possible:

from multiprocessing import Pool, Value, RawArray, RLock
from time import sleep

class Latch:

    '''A countdown latch that lets us wait for a job of "n" parts'''

    def __init__(self, n):
        self.__counter = Value('i', n)
        self.__lock = RLock()

    def decrement(self):
        with self.__lock:
            self.__counter.value -= 1
            print('dec', self.read())
        return self.read() == 0

    def read(self):
        with self.__lock:
            return self.__counter.value

    def join(self):
        while self.read():
            sleep(1)


def list_of_values(x):
    '''An expensive function'''
    print(x, ': thinking...')
    sleep(1)
    print(x, ': thought')
    return list(range(x))


pool = Pool()


def async_f(x, on_complete=None):
    '''Return the sum of the values in the expensive list'''
    if x == 0:
        on_complete(0) # no list, return 0
    else:
        n = x # need to know size of result beforehand
        latch = Latch(n) # wait for n entires to be calculated
        result = RawArray('i', n+1) # where we will assemble the map
        def delayed_map(values):
            '''This is the callback for the pool async process - it runs
               in a separate thread within this process once the
               expensive list has been calculated and orchestrates the
               mapping of f over the result.'''
            result[0] = x # first value in list is x
            for (v, i) in enumerate(values):
                def callback(fx, i=i):
                    '''This is the callback passed to f() and is called when
                       the function completes.  If it is the last of all the
                       calls in the map then it calls on_complete() (ie another
                       instance of this function) for the calling f().'''
                    result[i+1] = fx
                    if latch.decrement(): # have completed list
                        # at this point result contains [x]+map(f, ...)
                        on_complete(sum(result)) # so return sum
                async_f(v, callback)
        # Ask worker to generate list then call delayed_map
        pool.apply_async(list_of_values, [x], callback=delayed_map)


def run():
    '''Tie into the same mechanism as above, for the final value.'''
    result = Value('i')
    latch = Latch(1)
    def final_callback(value):
        result.value = value
        latch.decrement()
    async_f(6, final_callback)
    latch.join() # wait for everything to complete
    return result.value


print(run())

PS: I am using Python 3.2 and the ugliness above is because we are delaying computation of the final results (going back up the tree) until later. It's possible something like generators or futures could simplify things.

Also, I suspect you need a cache to avoid needlessly recalculating the expensive function when called with the same argument as earlier.

See also yaniv's answer - which seems to be an alternative way to reverse the order of the evaluation by being explicit about depth.

听闻余生 2024-12-09 02:24:27

经过思考,我找到了一个简单、不完整但足够好的答案:

# A partially parallel solution. Just do the first level of recursion in parallel. It might be enough work to fill all cores.
import multiprocessing

def f_helper(data):
     return f(x=data['x'],depth=data['depth'], recursion_depth=data['recursion_depth'])

def f(x, depth, recursion_depth):
    if depth==0:
        return ...
    else :
        if recursion_depth == 0:
            pool = multiprocessing.Pool(processes=4)
            result = [x] + pool.map(f_helper, [{'x':_x, 'depth':depth-1,  'recursion_depth':recursion_depth+1 } _x in list_of_values(x)])
            pool.close()
        else:
            result = [x] + map(f_helper, [{'x':_x, 'depth':depth-1, 'recursion_depth':recursion_depth+1 } _x in list_of_values(x)])


        return result

def list_of_values(x):
    # Heavy compute, pure function

After thinking about this, I found a simple, not complete, but good enough answer:

# A partially parallel solution. Just do the first level of recursion in parallel. It might be enough work to fill all cores.
import multiprocessing

def f_helper(data):
     return f(x=data['x'],depth=data['depth'], recursion_depth=data['recursion_depth'])

def f(x, depth, recursion_depth):
    if depth==0:
        return ...
    else :
        if recursion_depth == 0:
            pool = multiprocessing.Pool(processes=4)
            result = [x] + pool.map(f_helper, [{'x':_x, 'depth':depth-1,  'recursion_depth':recursion_depth+1 } _x in list_of_values(x)])
            pool.close()
        else:
            result = [x] + map(f_helper, [{'x':_x, 'depth':depth-1, 'recursion_depth':recursion_depth+1 } _x in list_of_values(x)])


        return result

def list_of_values(x):
    # Heavy compute, pure function
为人所爱 2024-12-09 02:24:27

我最初存储主进程ID并将其传输到子程序。

当我需要启动多处理作业时,我检查主进程的子进程数量。如果它小于或等于我的 CPU 数量的一半,那么我将它作为并行运行。如果它大于我的 CPU 数量的一半,那么我会串行运行它。通过这种方式,它可以避免瓶颈并有效地利用 CPU 核心。您可以根据您的情况调整核心数量。例如,您可以将其设置为确切的 CPU 核心数,但不应超过它。

def subProgramhWrapper(func, args):
    func(*args)

parent = psutil.Process(main_process_id)
children = parent.children(recursive=True)
num_cores = int(multiprocessing.cpu_count()/2)

if num_cores >= len(children):
    #parallel run
    pool = MyPool(num_cores)
    results = pool.starmap(subProgram, input_params)
    pool.close()
    pool.join()
else:
    #serial run
    for input_param in input_params:
        subProgramhWrapper(subProgram, input_param)

I store the main process id initially and transfer it to sub programs.

When I need to start a multiprocessing job, I check the number of children of the main process. If it is less than or equal to the half of my CPU count, then I run it as parallel. If it greater than the half of my CPU count, then I run it serial. In this way, it avoids bottlenecks and uses CPU cores effectively. You can tune the number of cores for your case. For example, you can set it to the exact number of CPU cores, but you should not exceed it.

def subProgramhWrapper(func, args):
    func(*args)

parent = psutil.Process(main_process_id)
children = parent.children(recursive=True)
num_cores = int(multiprocessing.cpu_count()/2)

if num_cores >= len(children):
    #parallel run
    pool = MyPool(num_cores)
    results = pool.starmap(subProgram, input_params)
    pool.close()
    pool.join()
else:
    #serial run
    for input_param in input_params:
        subProgramhWrapper(subProgram, input_param)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文