Python: _pickle.PicklingError: 无法 pickle >

发布于 2025-01-16 08:20:58 字数 1401 浏览 3 评论 0 原文

我正在运行 Python 3.9.1

注意:我知道有类似标题的问题。但这些问题嵌入在复杂的代码中,导致很难理解问题。这是问题的简单实现,我认为其他人会发现更容易理解。

编辑:我的代码中有 Pool(processes=64) 。但大多数其他人可能必须根据计算机上有多少个核心来更改此设置。如果花费的时间太长,请将 listLen 更改为较小的数字。

我正在尝试了解多处理以解决工作中的问题。我有一个数组列表,需要对数组进行成对比较。但为了简单起见,我使用简单的整数而不是数组和加法函数而不是调用一些复杂的比较函数来重新创建问题的要点。使用下面的代码,我遇到了标题错误

import time
from multiprocessing import Pool
import itertools
import random

def add_nums(a, b):
    return(a + b)

if __name__ == "__main__":
    listLen = 1000
    
    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [*itertools.combinations(range(len(myList)),2)]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

I'm running Python 3.9.1

Note: I know there are questions with similar titles. But those questions are embedded in complicated code that makes it hard to understand the problem. This is a bare-bones implementation of the problem which I think others will find easier to digest.

EDIT: I have Pool(processes=64) in my code. But most other will probably have to change this according to how many cores there are on their computer. And if it takes too long, change listLen to a smaller number

I'm trying to learn about multiprocessing in order to solve a problem at work. I have a list of arrays with which I need to do a pairwise comparison of the arrays. But for simplicity, I've recreated the gist of the problem with simple integers instead of arrays and an addition function instead of a call to some complicated comparison function. With the code below, I'm running into the titular error

import time
from multiprocessing import Pool
import itertools
import random

def add_nums(a, b):
    return(a + b)

if __name__ == "__main__":
    listLen = 1000
    
    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [*itertools.combinations(range(len(myList)),2)]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

凡间太子 2025-01-23 08:20:58

我不太确定为什么(尽管彻底阅读了 multiprocessing docs 可能会有答案),但是 python 的多处理涉及一个 pickling 过程,其中子进程会传递某些东西。虽然我预计 lambda 会被继承,而不是通过 pickle-ing 传递,但我想事实并非如此。

按照评论中的讨论,考虑类似这种方法:

import time
from multiprocessing import Pool
import itertools
import numpy as np
from multiprocessing import shared_memory

def add_mats(a, b):
    #time.sleep(0.00001)
    return (a + b)

# Helper for mp version
def add_mats_shared(shm_name, array_shape, array_dtype, i1, i2):
    shm = shared_memory.SharedMemory(name=shm_name)
    stacked = np.ndarray(array_shape, dtype=array_dtype, buffer=shm.buf)
    a = stacked[i1]
    b = stacked[i2]
    result = add_mats(a, b)
    shm.close()
    return result

if __name__ == "__main__":
    class Timer:
        def __init__(self):
            self.start = None
            self.stop  = None
            self.delta = None

        def __enter__(self):
            self.start = time.time()
            return self

        def __exit__(self, *exc_args):
            self.stop = time.time()
            self.delta = self.stop - self.start

    arrays = [np.random.rand(5,5) for _ in range(50)]
    index_combns = list(itertools.combinations(range(len(arrays)),2))

    # Helper for non-mp version
    def add_mats_pair(ij_pair):
        i, j = ij_pair
        a = arrays[i]
        b = arrays[j]
        return add_mats(a, b)

    with Timer() as t:
        # Do the pairwise operation without multiprocessing
        sums_no_mp = list(map(add_mats_pair, index_combns))

    print(f"Process took {t.delta} seconds with no MP")


    with Timer() as t:
        # Stack arrays and copy result into shared memory
        stacked = np.stack(arrays)
        shm = shared_memory.SharedMemory(create=True, size=stacked.nbytes)
        shm_arr = np.ndarray(stacked.shape, dtype=stacked.dtype, buffer=shm.buf)
        shm_arr[:] = stacked[:]

        with Pool(processes=32) as pool:
            processes = [pool.apply_async(add_mats_shared, (
                shm.name,
                stacked.shape,
                stacked.dtype,
                i,
                j,
            )) for (i,j) in index_combns]
            sums_mp = [p.get() for p in processes]

        shm.close()
        shm.unlink()

    print(f"Process took {t.delta} seconds with MP")

    for i in range(len(sums_no_mp)):
        assert (sums_no_mp[i] == sums_mp[i]).all()

    print("Results match.")

它使用 multiprocessing.shared_memory 在主机进程和子进程之间共享单个 numpy (N+1) 维数组(而不是 N 维数组的列表)。

其他不同但无关紧要的事情:

  • Pool 用作上下文管理器,以防止必须显式关闭并加入它。
  • Timer 是一个简单的上下文管理器,用于对代码块进行计时。
  • 一些数字已随机调整
  • pool.map 替换为对 pool.apply_async 的调用

pool.map 也可以,但您' d 希望在 .map 调用之前构建参数列表并将其解压到工作函数中,例如:

with Pool(processes=32) as pool:
    args = [(
        shm.name,
        stacked.shape,
        stacked.dtype,
        i,
        j,
    ) for (i,j) in index_combns]
    sums_mp = pool.map(add_mats_shared, args)

# and 

# Helper for mp version
def add_mats_shared(args):
    shm_name, array_shape, array_dtype, i1, i2 = args
    shm = shared_memory.SharedMemory(name=shm_name)
    ....

I'm not exactly sure why (though a thorough read through the multiprocessing docs would probably have an answer), but there's a pickling process involved in python's multiprocessing where child processes are passed certain things. While I would have expected the lambdas to be inherited and not passed via pickle-ing, I guess that's not what's happening.

Following the discussion in the comments, consider something like this approach:

import time
from multiprocessing import Pool
import itertools
import numpy as np
from multiprocessing import shared_memory

def add_mats(a, b):
    #time.sleep(0.00001)
    return (a + b)

# Helper for mp version
def add_mats_shared(shm_name, array_shape, array_dtype, i1, i2):
    shm = shared_memory.SharedMemory(name=shm_name)
    stacked = np.ndarray(array_shape, dtype=array_dtype, buffer=shm.buf)
    a = stacked[i1]
    b = stacked[i2]
    result = add_mats(a, b)
    shm.close()
    return result

if __name__ == "__main__":
    class Timer:
        def __init__(self):
            self.start = None
            self.stop  = None
            self.delta = None

        def __enter__(self):
            self.start = time.time()
            return self

        def __exit__(self, *exc_args):
            self.stop = time.time()
            self.delta = self.stop - self.start

    arrays = [np.random.rand(5,5) for _ in range(50)]
    index_combns = list(itertools.combinations(range(len(arrays)),2))

    # Helper for non-mp version
    def add_mats_pair(ij_pair):
        i, j = ij_pair
        a = arrays[i]
        b = arrays[j]
        return add_mats(a, b)

    with Timer() as t:
        # Do the pairwise operation without multiprocessing
        sums_no_mp = list(map(add_mats_pair, index_combns))

    print(f"Process took {t.delta} seconds with no MP")


    with Timer() as t:
        # Stack arrays and copy result into shared memory
        stacked = np.stack(arrays)
        shm = shared_memory.SharedMemory(create=True, size=stacked.nbytes)
        shm_arr = np.ndarray(stacked.shape, dtype=stacked.dtype, buffer=shm.buf)
        shm_arr[:] = stacked[:]

        with Pool(processes=32) as pool:
            processes = [pool.apply_async(add_mats_shared, (
                shm.name,
                stacked.shape,
                stacked.dtype,
                i,
                j,
            )) for (i,j) in index_combns]
            sums_mp = [p.get() for p in processes]

        shm.close()
        shm.unlink()

    print(f"Process took {t.delta} seconds with MP")

    for i in range(len(sums_no_mp)):
        assert (sums_no_mp[i] == sums_mp[i]).all()

    print("Results match.")

It uses multiprocessing.shared_memory to share a single numpy (N+1)-dimensional array (instead of a list of N-dimensional arrays) between the host process and child processes.

Other things that are different but don't matter:

  • Pool is used as a context manager to prevent having to explicitly close and join it.
  • Timer is a simply context manager to time blocks of code.
  • Some of the numbers have been adjusted randomly
  • pool.map replaced with calls to pool.apply_async

pool.map would be fine too, but you'd want to build the argument list before the .map call and unpack it in the worker function, e.g.:

with Pool(processes=32) as pool:
    args = [(
        shm.name,
        stacked.shape,
        stacked.dtype,
        i,
        j,
    ) for (i,j) in index_combns]
    sums_mp = pool.map(add_mats_shared, args)

# and 

# Helper for mp version
def add_mats_shared(args):
    shm_name, array_shape, array_dtype, i1, i2 = args
    shm = shared_memory.SharedMemory(name=shm_name)
    ....
睫毛上残留的泪 2025-01-23 08:20:58

Python 无法pickle lambda 函数。相反,您应该定义函数并传递函数名称。以下是解决此问题的方法:

import itertools
import random
import time
from multiprocessing import Pool


def add_nums(a, b):
    return a + b


def foo(x):
    return add_nums(x[0], x[1])


if __name__ == "__main__":
    listLen = 1000

    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [
        (myList[i[0]], myList[i[1]])
        for i in itertools.combinations(range(len(myList)), 2)
    ]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(foo, index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(foo, index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

我修改了 index_combns 以也从 myList 中提取值,因为 myList 无法从 < code>foo 并传入 myList 的多个副本将增加脚本的空间复杂度。

运行此打印:

Process took 0.053926944732666016 seconds with no MP
Process took 0.4799039363861084 seconds with MP

Python cannot pickle lambda functions. Instead you should define the function and pass the function name instead. Here is how you may approach this:

import itertools
import random
import time
from multiprocessing import Pool


def add_nums(a, b):
    return a + b


def foo(x):
    return add_nums(x[0], x[1])


if __name__ == "__main__":
    listLen = 1000

    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [
        (myList[i[0]], myList[i[1]])
        for i in itertools.combinations(range(len(myList)), 2)
    ]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(foo, index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(foo, index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

I modified index_combns to also extract the value from myList in place, because myList will not be accessible from foo and passing in multiple copies of myList will increase space complexity of your script.

Running this prints:

Process took 0.053926944732666016 seconds with no MP
Process took 0.4799039363861084 seconds with MP
放飞的风筝 2025-01-23 08:20:58

问:
“...尝试了解多处理以解决工作中的问题。”

答:
需要学习的最重要的经验
是(流程)-实例成本有多大,
所有其他附加间接费用
(仍然绝不是不可忽视的,随着问题规模的扩大,更多)
与这个巨大的&相比,细节是更多的。校长一.

在通读并完全理解答案之前,这里有一个实时 GUI 交互式模拟器我们需要支付多少费用才能开始使用超过 1 个流程编排流(成本各不相同 - 线程较低,MPI 较高基于分布式操作,最高为multiprocessing-进程,如 Python 解释器中使用的那样,其中首先复制主 Python 解释器进程的 N 个副本(分配 RAM 并生成 O/S 调度程序 - 2022 年第二季度仍然报告问题,如果较便宜的后端试图避免这种成本,但由于错误共享或错误复制或忘记复制一些已经阻塞的 MUTEX-es 和类似的内部结构而导致死锁问题 - 因此即使是完整副本2022 年并不总是安全 - 正如无数专业人士所记录的那样,没有亲自见到它们并不意味着它们仍然存在 - a 关于一群鲨鱼的故事是一个很好的起点)

Live Simulatordescription here

问题清单:

a) pickling lambdas < support> (以及许多其他 SER/DES 拦截器)

很简单 - conda install dillimport dill as 就足够了pickle 作为 dill 可以,多年来pickle它们 - 归功于@MikeMcKearns并且您的代码不需要重构简单的pickle.dumps()调用接口的使用。因此,使用pathos.multiprocess默认在内部使用dill,并且可以避免多年来已知的multiprocessing SER/DES弱点。

b ) 性能杀手

- multiprocessing.Pool.map() 是一个端到端的性能反模式此处 - 成本...,如果我们开始不忽视它们,请显示有多少 CPU 时钟和内存。阻塞的物理 RAM-I/O 传输是为如此多的进程实例(60+)付出代价的,这些实例最终“占用”了几乎所有物理 CPU 核心,但为真正的高性能留下了几乎为零的空间numpy< /code>-核心问题的本机多核计算(最终性能预计会得到提升,不是吗?)

- 只需移动p - 模拟器中的滑块小于 100%(没有 [SERIAL] - 问题执行的一部分,这在理论上,但在实践中永远不可能,甚至程序启动也是纯粹的-[SERIAL],按设计)

-只需移动Overhead - 将模拟器中的滑块设置为任何高于普通零的值(表示生成之一的相对附加成本NCPUcore 个进程,以百分比表示,相对于指令的这种[PARALLEL]部分部分数量 - 数学上“密集”的工作有很多这样的“有用”-说明,并且可能,假设没有其他性能杀手跳出盒子,可能会花费一些合理数量的“附加”成本,以产生一定数量的并发或并行操作(实际数量仅取决于实际的成本经济性,不在于有多少个CPU核心,更不在于我们的“愿望”或学术甚至更糟糕的复制/粘贴“建议”)。相反,数学上“浅薄”的工作几乎总是“加速”<< 1(巨大减速),因为几乎没有机会证明已知的附加成本(在流程实例上支付、数据 SER/xfer/DES 移动在(参数)和返回(结果))

- 接下来将模拟器中的 Overhead-滑块移动到最右边缘<代码>== 1。这显示了这种情况,当实际的进程生成开销(损失的时间)成本仍然不超过所有计算的 <= 1 % 时接下来是相关的指令,这些指令将针对工作的“有用”部分执行,这将在此类生成的流程实例内进行计算。因此,即使是这样的 1:100 比例因子(比损失的 CPU 时间多 100 倍的“有用”工作,因为安排许多副本并使 O/S 调度程序在可用系统虚拟内存中协调其并发执行而被烧毁)加速降级进程图表中已经显示了所有警告 - 只需稍微玩一下Overhead - 模拟器中的滑块,在接触其他滑块之前...

- 避免“共享”的罪恶(如果性能是目标) - 同样,在现在独立的多个 Python 解释器进程之间运行此类编排的成本需要额外的附加成本,而在获得性能提升方面永远不合理,因为争夺共享资源(CPU 核心、物理 RAM-I/ O通道)只会造成破坏CPU 核心缓存重用命中率、操作系统调度程序操作的进程上下文切换以及所有这些进一步降低了端到端性能(这是我们不想要的,不是吗?)

c) 提高绩效

-尊重有关任何类型计算操作的实际成本
- 避免“浅层”计算步骤,
- 最大限度地提高一组分布式进程的成本(如果仍然需要)所以),
-避免所有增加开销的操作(例如添加本地临时变量,其中内联操作允许就地存储部分结果)

-尝试使用超高性能的缓存-线路友好&优化的、原生的numpy-矢量化多核和striding-tricks 功能,不会因为调度如此多(约 60 个)Python 解释器进程副本而被预重载的 CPU 核心阻塞,每个副本都尝试调用 numpy 代码,因此没有任何可用核心实际上将这种高性能、缓存重用友好的矢量化计算置于(我们得到或失去大部分性能,而不是在运行缓慢的串行迭代器中,而不是在产生 60 多个基于进程的“__main__”Python 解释器的完整副本,在对我们的大量数据进行单个有用的工作之前,昂贵的 RAM 分配和物理复制 60 多次)

-对实际问题的重构绝不能违背已收集的有关性能的知识,因为重复不起作用的事情会不带任何优势,是吗?
- 尊重您的物理平台限制,忽略它们会降低您的性能
- 基准测试、配置文件、重构
- 基准测试、配置文件、重构
- 基准测试、配置文件、重构
这里没有其他可用的魔杖

一旦已经致力于性能的前沿,请在将Python解释器生成N个复制副本之前设置gc.disable(),不等待自发追求终极性能时的垃圾收集

Q :
" ... trying to learn about multiprocessing in order to solve a problem at work. "

A :
the single most important piece of experience to learn
is how BIG are the COSTS-of-( process )-INSTANTIATION(s),
all other add-on overhead costs
( still by no means not negligible, the more in growing the scales of the problem )
are details in comparison to this immense & principal one.

Before the answer is read-through and completely understood, here is a Live GUI-interactive simulator of how much we will have to pay to start using more than 1 stream of process-flow orchestration ( costs vary - lower for threads, larger for MPI-based distributed operations, highest for multiprocessing-processes, as used in Python Interpreter, where N-many copies of the main Python Interpreter process get first copied (RAM-allocated and O/S scheduler spawned - as 2022-Q2 still reports issues if less expensive backends try to avoid this cost, yet at problems with deadlocking on wrong-shared or ill-copied or forgotten to copy some already blocking MUTEX-es and similar internalities - so that even the full-copy is not always safe in 2022 - not having met them in person does not mean these do not still exist, as documented by countless professionals - a story about a pool of sharks is a good place to start from )

Live Simulatordescription here

Inventory of problems :

a ) pickling lambdas ( and many other SER/DES blockers )

is easy - it is enough to conda install dill and import dill as pickle as dill can, for years pickle them - credit to @MikeMcKearns and your code does not need to refactor the use of the plain pickle.dumps()-call interface. So using pathos.multiprocess defaults to use dill internally, and this, for years known multiprocessing SER/DES weakness gets avoided.

b ) performance killers

- multiprocessing.Pool.map() is rather an End-to-End performance anti-pattern here - The Costs..., if we start not to neglect them, show, how much CPU-clocks & blocked physical-RAM-I/O transfers are paid for so many process-instantiations ( 60+ ), which finally "occupy" almost all physical CPU-cores, yet leaving thus almost zero-space for indeed high performance numpy-native multicore-computing of the core-problem ( for which the ultimate performance was expected to be boosted up, wasn't it? )

- just move the p-slider in the simulator to anything less than 100% ( having no [SERIAL]-part of the problem execution, which is nice in theory, yet never doable in practice, even the program launch is a pure-[SERIAL], by design )

- just move the Overhead-slider in the simulator to anything above a plain zero ( expressing a relative add-on cost of spawning a one of NCPUcores processes, as a number of percent, relative to the such [PARALLEL]-section part number of instructions - mathematically "dense" work has many such "useful"-instructions and may, supposing no other performance killers jump out of the box, may spends some reasonable amount of "add-on" costs, to spawn some amount of concurrent- or parallel-operations ( the actual number depends only on actual economy of costs, not on how many CPU-cores are present, the less on our "wishes" or scholastic or even worse copy/paste-"advice" ). On the contrary, mathematically "shallow" work has almost always "speedups" << 1 ( immense slow-downs ), as there is almost no chance to justify the known add-on costs ( paid on process-instantiations, data SER/xfer/DES moving in (params) and back (results) )

- next move the Overhead-slider in the simulator to the righmost edge == 1. This shows the case, when the actual process-spawning overhead-( a time lost )-costs are still not more than a just <= 1 % of all the computing-related instructions next, that are going to be performed for the "useful"-part of the work, that will be computed inside the such spawned process-instance. So even such 1:100 proportion factor ( doing 100x more "useful"-work than the lost CPU-time, burnt for arranging that many copies and making O/S-scheduler orchestrate concurrent execution thereof inside the available system Virtual-Memory ) has already all the warnings visible in the graph of the progression of Speedup-degradation - just play a bit with the Overhead-slider in the simulator, before touching the others...

- avoid a sin of "sharing" ( if performance is the goal ) - again, costs of operating such orchestration among several Python Interpreter processes, now independent, takes additional add-on costs, never justified in gaining performance boosted, as the fight for occupying shared resources ( CPU-cores, physical-RAM-I/O channels ) only devastates CPU-core-cache re-use hit-rates, O/S-scheduler operated process context-switches and all this further downgrades resulting End-to-End performance (which is something we do not want, do we?)

c ) boosting performance

- respect facts about the actual costs of any kind of computing operation
- avoid "shallow"-computing steps,
- maximise what gets so expensively into a set of distributed-processes (if a need remains so),
- avoid all overhead-adding operations (like adding local temporary variables, where inline operations permit to inplace store of partial results)
and
- try go into using the ultra-performant, cache-line friendly & optimised, native numpy-vectorised multicore & striding-tricks capabilities, not blocked by pre-overloaded CPU-cores by scheduling so many (~60) Python Interpreter process copies, each one trying to call numpy-code, thus not having any free cores to actually place such high-performance, cache-reuse-friendly vectorised computing onto (there we get-or-loose most of the performance, not in slow-running serial-iterators, not in spawning 60+ process-based full-copies of "__main__" Python Interpreter, before doing a single piece of the useful work on our great data, expensively RAM-allocated and physically copied 60+ times therein)

- refactoring of the real problem shall never go against a collected knowledge about performance as repeating the things that do not work will not bring any advantage, will it?
- respect your physical platform constraints, ignoring them will degrade your performance
- benchmark, profile, refactor
- benchmark, profile, refactor
- benchmark, profile, refactor
no other magic wand available here

and once already working on the bleeding edge of performance, set gc.disable() before you spawn the Python Interpreter into N-many replicated copies, not to wait for spontaneous garbage-collections when going for ultimate performance

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文