具有依赖于模块的嵌套函数的并行映射

发布于 2025-01-15 20:21:31 字数 2300 浏览 4 评论 0 原文

这是我尝试

import numpy as np

def generate_function(a):
    def func(x):
        '''a complex function that uses several modules'''
        return x + np.sqrt(a)
    return func

if __name__ == '__main__':
    f = generate_function(0.5)
    x = np.arange(0, 100)
    y = np.array(list(map(f, x))) # want to parallelize this step

multiprocessing 并行化的一个最小示例,嵌套的 func 会导致问题,因为 pickle 无法访问嵌套函数

import multiprocessing as mp
...
pool = mp.Pool(2)
y = np.array(pool.map(f, x))

AttributeError: Can't pickle local object 'generate_function.<locals>.func'

即使使用 pathos,模块也不会导入

import pathos
...
pool = pathos.multiprocessing.ProcessPool(2)
y = np.array(pool.map(f, x))

NameError: name 'np' is not defined

请注意,Python 多处理 PicklingError:无法 pickle 要么工作

并行化的最佳方法是什么?


因此,通过在 generate_function 内部重新导入来使 pathos 工作是可能的

def generate_function(a):
    import numpy as np
    def func(x):
        '''a complex function that uses several modules'''
        return x + np.sqrt(a)
    return func

,但我可能有多个导入多个 generate_function > 和多层嵌套,跟踪所有这些很快就会变得非常麻烦,所以我想避免这种混乱

def generate_function1(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def func(x):
        ...
    return func

def generate_function2(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def func(x):
        ...
    return func

def generate_generator_function(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def generate_function(a):
        import module1, module2, module3
        from module4 import a, b
        from module5 import c as d
        from module6 import e as f
        def func(x):
            ...
        return func
    return generate_function

Here's a minimal example of what I'm trying to parallelize

import numpy as np

def generate_function(a):
    def func(x):
        '''a complex function that uses several modules'''
        return x + np.sqrt(a)
    return func

if __name__ == '__main__':
    f = generate_function(0.5)
    x = np.arange(0, 100)
    y = np.array(list(map(f, x))) # want to parallelize this step

with multiprocessing, the nested func causes problems, since pickle can't access nested functions

import multiprocessing as mp
...
pool = mp.Pool(2)
y = np.array(pool.map(f, x))

AttributeError: Can't pickle local object 'generate_function.<locals>.func'

even with pathos, the modules are not imported

import pathos
...
pool = pathos.multiprocessing.ProcessPool(2)
y = np.array(pool.map(f, x))

NameError: name 'np' is not defined

Note that none of the other solutions on Python multiprocessing PicklingError: Can't pickle <type 'function'> work either

What's the best way to parallelize this?


So it is possible to get pathos to work by reimporting inside of generate_function

def generate_function(a):
    import numpy as np
    def func(x):
        '''a complex function that uses several modules'''
        return x + np.sqrt(a)
    return func

but I may have several imports with multiple generate_functions and multiple layers of nesting, and it will quickly get quite cumbersome keeping track of all that, so I would like to avoid this mess

def generate_function1(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def func(x):
        ...
    return func

def generate_function2(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def func(x):
        ...
    return func

def generate_generator_function(a):
    import module1, module2, module3
    from module4 import a, b
    from module5 import c as d
    from module6 import e as f
    def generate_function(a):
        import module1, module2, module3
        from module4 import a, b
        from module5 import c as d
        from module6 import e as f
        def func(x):
            ...
        return func
    return generate_function

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

手心的海 2025-01-22 20:21:31

您可以使用concurrent.futures:

import concurrent.futures

f = generate_function(0.5)
x = np.arange(0, 100)
with concurrent.futures.ThreadPoolExecutor() as ex:
    y = ex.map(f, x)

You may use concurrent.futures:

import concurrent.futures

f = generate_function(0.5)
x = np.arange(0, 100)
with concurrent.futures.ThreadPoolExecutor() as ex:
    y = ex.map(f, x)
千里故人稀 2025-01-22 20:21:31

这不会解决您的泡菜问题,但这是我对管理导入的类的想法。

>>> class funcFactory:
...     import numpy as np
...     def __init__(self):
...             pass
...     def makef(self,a):
...             def func(x):
...                     return a+funcFactory.np.sqrt(x)
...             return func
...
>>> ff = funcFactory()
>>> f = ff.makef(1)
>>> f(4)
3.0

结合@Schotty关于使用concurrent.futures的建议,你最终会得到如下所示的代码:

import concurrent.futures
import numpy as np

class funcFactory:
    import numpy as np
    def makef(self,a):
        def func(x):
            return a+funcFactory.np.sqrt(x)
        func.__reduce__ = lambda:""
        return func

f = funcFactory().makef(0.5)
with concurrent.futures.ThreadPoolExecutor() as ex:
    y = ex.map(f, np.arange(0, 100))
print(list(y))

This won't solve your pickle problems but here's my thinking with classes to manage your imports.

>>> class funcFactory:
...     import numpy as np
...     def __init__(self):
...             pass
...     def makef(self,a):
...             def func(x):
...                     return a+funcFactory.np.sqrt(x)
...             return func
...
>>> ff = funcFactory()
>>> f = ff.makef(1)
>>> f(4)
3.0

Incorporating @Schotty's suggestion to use concurrent.futures you end up with code that looks like this:

import concurrent.futures
import numpy as np

class funcFactory:
    import numpy as np
    def makef(self,a):
        def func(x):
            return a+funcFactory.np.sqrt(x)
        func.__reduce__ = lambda:""
        return func

f = funcFactory().makef(0.5)
with concurrent.futures.ThreadPoolExecutor() as ex:
    y = ex.map(f, np.arange(0, 100))
print(list(y))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文