利用“抄写副本”将数据复制到多处理。pool()工作流程

发布于 2025-02-12 19:37:22 字数 2009 浏览 4 评论 0原文

我有一些多处理 python代码,看起来有点像这样:

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = np.zeros(100000000) # basically a big memory struct

    def my_multithreaded_analysis(self):
        arg_lists = [(self, i) for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(call_method, arg_lists)
        print result

    def analyze(self, i):
        time.sleep(10)
        return i ** 2

def call_method(args):
    my_instance, i = args
    return my_instance.analyze(i)


if __name__ == '__main__':
    my_instance = MyClass()
    my_instance.my_multithreaded_analysis()

在阅读了有关记忆如何在其他stackoverflow中工作的答案之后,例如此

我的问题基本上是,我是否正确解释了我的myClass的实例,实际上是在整个池中重复的?如果是这样,我该如何防止这种情况;我应该不使用这样的结构吗?我的目标是减少记忆使用量以进行计算分析。

PID   COMMAND      %CPU  TIME     #TH    #WQ  #PORT MEM    PURG   CMPRS  PGRP PPID STATE
2494  Python       0.0   00:01.75 1      0    7     765M   0B     0B     2484 2484 sleeping
2493  Python       0.0   00:01.85 1      0    7     765M   0B     0B     2484 2484 sleeping
2492  Python       0.0   00:01.86 1      0    7     765M   0B     0B     2484 2484 sleeping
2491  Python       0.0   00:01.83 1      0    7     765M   0B     0B     2484 2484 sleeping
2490  Python       0.0   00:01.87 1      0    7     765M   0B     0B     2484 2484 sleeping
2489  Python       0.0   00:01.79 1      0    7     167M   0B     597M   2484 2484 sleeping
2488  Python       0.0   00:01.77 1      0    7     10M    0B     755M   2484 2484 sleeping
2487  Python       0.0   00:01.75 1      0    7     8724K  0B     756M   2484 2484 sleeping
2486  Python       0.0   00:01.78 1      0    7     9968K  0B     755M   2484 2484 sleeping
2485  Python       0.0   00:01.74 1      0    7     171M   0B     594M   2484 2484 sleeping
2484  Python       0.1   00:16.43 4      0    18    775M   0B     12K    2484 2235 sleeping

I have a bit of multiprocessing Python code that looks a bit like this:

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = np.zeros(100000000) # basically a big memory struct

    def my_multithreaded_analysis(self):
        arg_lists = [(self, i) for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(call_method, arg_lists)
        print result

    def analyze(self, i):
        time.sleep(10)
        return i ** 2

def call_method(args):
    my_instance, i = args
    return my_instance.analyze(i)


if __name__ == '__main__':
    my_instance = MyClass()
    my_instance.my_multithreaded_analysis()

After reading answers about how memory works in other StackOverflow answers such as this one Python multiprocessing memory usage I was under the impression that this would not use memory in proportion to how many processes I used for multiprocessing, since it is copy-on-write and I have not modified any of the attributes of my_instance. However, I do see high memory for all processes when I run top it says most of my processes are using a lot of memory (this is top output from OSX, but I can replicate on Linux).

My question is basically, am I interpreting this correctly in that my instance of MyClass is actually duplicated across the pool? And if so, how can I prevent this; should I just not use a construction like this? My goal is to reduce memory usage for a computational analysis.

PID   COMMAND      %CPU  TIME     #TH    #WQ  #PORT MEM    PURG   CMPRS  PGRP PPID STATE
2494  Python       0.0   00:01.75 1      0    7     765M   0B     0B     2484 2484 sleeping
2493  Python       0.0   00:01.85 1      0    7     765M   0B     0B     2484 2484 sleeping
2492  Python       0.0   00:01.86 1      0    7     765M   0B     0B     2484 2484 sleeping
2491  Python       0.0   00:01.83 1      0    7     765M   0B     0B     2484 2484 sleeping
2490  Python       0.0   00:01.87 1      0    7     765M   0B     0B     2484 2484 sleeping
2489  Python       0.0   00:01.79 1      0    7     167M   0B     597M   2484 2484 sleeping
2488  Python       0.0   00:01.77 1      0    7     10M    0B     755M   2484 2484 sleeping
2487  Python       0.0   00:01.75 1      0    7     8724K  0B     756M   2484 2484 sleeping
2486  Python       0.0   00:01.78 1      0    7     9968K  0B     755M   2484 2484 sleeping
2485  Python       0.0   00:01.74 1      0    7     171M   0B     594M   2484 2484 sleeping
2484  Python       0.1   00:16.43 4      0    18    775M   0B     12K    2484 2235 sleeping

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

薄暮涼年 2025-02-19 19:37:23

另外,要利用分叉的复制品福利,在保留封装的外观的同时,您可以利用纯的班级属性和@classmethods Globals

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):

    myAttribute = np.zeros(100000000) # basically a big memory struct
    # myAttribute is a class-attribute

    @classmethod
    def my_multithreaded_analysis(cls):
        arg_list = [i for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(analyze, arg_list)
        print result

    @classmethod
    def analyze(cls, i):
        time.sleep(10)
        # If you wanted, you could access cls.myAttribute w/o worry here.
        return i ** 2

""" We don't need this proxy step !
    def call_method(args):
        my_instance, i = args
        return my_instance.analyze(i)
"""

if __name__ == '__main__':
    my_instance = MyClass()
    # Note that now you can instantiate MyClass anywhere in your app,
    # While still taking advantage of copy-on-write forking
    my_instance.my_multithreaded_analysis()

注1:是的,我承认class-attributes类方法是荣耀的全球范围。但是它购买了一些封装...

注2:,而不是明确创建您的arg_lists,您可以隐式传递实例(self)到每个任务由pool创建,通过将界限方法传递给分析(self) pool.map(),并在脚上拍摄自己更容易!

Alternatively, to take advantage of forking's copy-on-write benefits, while preserving some semblance of encapsulation, you could leverage class-attributes and @classmethods over pure globals.

import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):

    myAttribute = np.zeros(100000000) # basically a big memory struct
    # myAttribute is a class-attribute

    @classmethod
    def my_multithreaded_analysis(cls):
        arg_list = [i for i in range(10)]
        pool = Pool(processes=10)
        result = pool.map(analyze, arg_list)
        print result

    @classmethod
    def analyze(cls, i):
        time.sleep(10)
        # If you wanted, you could access cls.myAttribute w/o worry here.
        return i ** 2

""" We don't need this proxy step !
    def call_method(args):
        my_instance, i = args
        return my_instance.analyze(i)
"""

if __name__ == '__main__':
    my_instance = MyClass()
    # Note that now you can instantiate MyClass anywhere in your app,
    # While still taking advantage of copy-on-write forking
    my_instance.my_multithreaded_analysis()

Note 1: Yes, I admit that class-attributes and class-methods are glorified globals. But it buys a bit of encapsulation...

Note 2: Rather than explicitly creating your arg_lists above, you can implicitly pass the instance (self) to each task created by Pool, by passing the bound-instance method analyze(self) to Pool.map(), and shoot yourself in the foot even easier!

守不住的情 2025-02-19 19:37:22

发送到pool.map(和相关方法)的任何内容实际上都不使用共享的复制在写入资源。这些值为“腌制”(Python的序列化机制)工程处理并在那里进行了未划分的处理,从而从头开始重建孩子中的对象。因此,在这种情况下,每个孩子最终都获得了原始数据的复制版本(从未使用过,因为它被告知使用通过IPC发送的复制),以及对原始数据的个人娱乐在孩子中重建,没有共享。

如果您想利用分叉的写入福利,则不能在管道上发送数据(或引用数据引用数据的对象)。您必须将它们存储在可以通过访问自己的全球群体从孩子那里找到的位置。因此:例如:

import os
import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = os.urandom(1024*1024*1024) # basically a big memory struct(~1GB size)

    def my_multithreaded_analysis(self):
        arg_lists = list(range(10))  # Don't pass self
        pool = Pool(processes=10)
        result = pool.map(call_method, arg_lists)
        print result

    def analyze(self, i):
        time.sleep(10)
        return i ** 2

def call_method(i):
    # Implicitly use global copy of my_instance, not one passed as an argument
    return my_instance.analyze(i)

# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()


if __name__ == '__main__':
    my_instance.my_multithreaded_analysis()

通过不传递self,您避免制作副本,而只需使用被映射到子女的单个全局对象即可。如果您需要多个对象,则可以在创建池之前制作一个全局list dict 映射到对象的实例,然后传递可以的索引或键查找对象作为参数的一部分,pool.map。然后,工人功能使用索引/键(必须腌制并通过IPC发送给孩子)以查找全局dict(也是抄写映射的复制映射)的值(也是复制映射),因此,您将廉价的信息复制到不复制孩子的孩子中的昂贵数据。

如果对象很小,即使您不写信给它们,它们也会被复制。 CPYTHON是参考计数的,并且参考计数出现在公共对象标头中,并通过参考对象不断更新,即使它是逻辑上非变形引用。因此,将写入并复制的小对象(以及在内存同一页面中分配的所有其他对象)。对于大对象(您的亿个元素numpy阵列),只要您不写它,大部分将保持共享,因为标头仅占据了许多页面之一

在Python版本3.8中更改了:在MacOS上,Spawn Start方法现在为默认值。参见 mulitProcessing doc 。 Spawn不利用折叠式复印件。

Anything sent to pool.map (and related methods) isn't actually using shared copy-on-write resources. The values are "pickled" (Python's serialization mechanism), sent over pipes to the worker processes and unpickled there, which reconstructs the object in the child from scratch. Thus, each child in this case ends up with a copy-on-write version of the original data (which it never uses, because it was told to use the copy sent via IPC), and a personal recreation of the original data that was reconstructed in the child and is not shared.

If you want to take advantage of forking's copy-on-write benefits, you can't send data (or objects referencing the data) over the pipe. You have to store them in a location that can be found from the child by accessing their own globals. So for example:

import os
import time
from multiprocessing import Pool
import numpy as np

class MyClass(object):
    def __init__(self):
        self.myAttribute = os.urandom(1024*1024*1024) # basically a big memory struct(~1GB size)

    def my_multithreaded_analysis(self):
        arg_lists = list(range(10))  # Don't pass self
        pool = Pool(processes=10)
        result = pool.map(call_method, arg_lists)
        print result

    def analyze(self, i):
        time.sleep(10)
        return i ** 2

def call_method(i):
    # Implicitly use global copy of my_instance, not one passed as an argument
    return my_instance.analyze(i)

# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()


if __name__ == '__main__':
    my_instance.my_multithreaded_analysis()

By not passing self, you avoid making copies, and just use the single global object that was copy-on-write mapped into the child. If you needed more than one object, you might make a global list or dict mapping to instances of the object prior to creating the pool, then pass the index or key that can look up the object as part of the argument(s) to pool.map. The worker function then uses the index/key (which had to be pickled and sent to the child over IPC) to look up the value (copy-on-write mapped) in the global dict (also copy-on-write mapped), so you copy cheap information to lookup expensive data in the child without copying it.

If the objects are smallish, they'll end up copied even if you don't write to them. CPython is reference counted, and the reference count appears in the common object header and is updated constantly, just by referring to the object, even if it's a logically non-mutating reference. So small objects (and all the other objects allocated in the same page of memory) will be written, and therefore copied. For large objects (your hundred million element numpy array), most of it would remain shared as long as you didn't write to it, since the header only occupies one of many pages

Changed in python version 3.8: On macOS, the spawn start method is now the default. See mulitprocessing doc. Spawn is not leveraging copy-on-write.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文