Python多进程:运行几个类的实例,将所有子进程保存在记忆中

发布于 2025-02-10 19:37:30 字数 3122 浏览 1 评论 0原文

首先,我要感谢Stackoverflow社区多年来为我提供的巨大帮助,而不必问一个问题。 我找不到与我的问题有关的任何东西,尽管这可能是由于我对这个主题的了解不足,而不是网站上没有回应。如果这是重复的,我会事先表示歉意。

我对多流程相对较新;一段时间以前,我成功地以一种非常简单的方式使用了多处理。在儿童过程之间我不需要任何反馈。 现在,我面临着一个更复杂的问题,而我只是迷失了有关多处理的文档。因此,我要求您提供帮助,您的好意和耐心。

我正在尝试从一堂课中构建一种平行的回火蒙特卡洛算法。

基本类的大致如下:

import numpy as np

class monte_carlo:

    def __init__(self):
        self.x=np.ones((1000,3))
        self.E=np.mean(self.x)
        self.Elist=[]

    def simulation(self,temperature):
        self.T=temperature
        for i in range(3000):
            self.MC_step()
            if i%10==0:
                self.Elist.append(self.E)
        return

    def MC_step(self):
        x=self.x.copy()
        k = np.random.randint(1000)
        x[k] = (x[k] + np.random.uniform(-1,1,3))
        temp_E=np.mean(self.x)
        if np.random.random()<np.exp((self.E-temp_E)/self.T):
            self.E=temp_E
            self.x=x
        return

显然,我简化了很多(实际类长500行!),并为简单起见构建假函数:__ INT __ INT __将一堆参数作为参数,那里除了自我。关键点是,该类的每个实例都包含我想保留记忆的大量信息,并且我不想一遍又一遍地复制,以避免剧烈的减速。否则,我只会使用多处理。池模块。

现在,我想做的并行化,在伪代码:

def proba(dE,pT):
    return np.exp(-dE/pT)  
          
Tlist=[1.1,1.2,1.3]
N=len(Tlist)
G=[]
for _ in range(N):
    G.append(monte_carlo())

for _ in range(5):

    for i in range(N): # this loop should be ran in multiprocess
        G[i].simulation(Tlist[i])
    
    for i in range(N//2): 
        dE=G[i].E-G[i+1].E
        pT=G[i].T + G[i+1].T
        p=proba(dE,pT) # (proba is a function, giving a probability depending on dE)
        if np.random.random() < p: 
             T_temp = G[i].T
             G[i].T = G[i+1].T
             G[i+1].T = T_temp

综合:我想在并行子过程中运行我的蒙特卡洛类的几个实例,参数t的值不同,然后定期暂停所有内容以更改不同的t。并再次运行孩子的过程/班级实例,从他们停下来的地方。 这样做,我希望每个班级/儿童过程都能彼此独立,在暂停所有内部变量时将其当前状态保存,并尽可能少。最后一点至关重要,因为同类内部的数组很大(有些是1000x1000),因此,副本将很快变为时间表。

预先感谢,对不起,如果我不清楚...

编辑: 我正在使用许多(64)CPU的远处机器,在Debian GNU/Linux 10(Buster)上运行。

edit2: 我在原始帖子中犯了一个错误:最后,必须在班级现场之间交换温度,而不是在全局tlist之间进行交换。

EDIT3:Charchit答案在我的个人机器和通常用于运行代码的遥远机器上的测试代码非常有效。因此,我将其视为接受的答案。 但是,我想在这里报告,插入实际,更复杂的代码,而不是过度简化monte_carlo类,遥远的机器给了我一些奇怪的错误:

Unable to init server: Could not connect: Connection refused

(CMC_temper_all.py:55509): Gtk-WARNING **: ##:##:##:###: Locale not supported by C library.
    Using the fallback 'C' locale.
Unable to init server: Could not connect: Connection refused

(CMC_temper_all.py:55509): Gdk-CRITICAL **: ##:##:##:###: 

gdk_cursor_new_for_display: assertion 'GDK_IS_DISPLAY (display)' failed

(CMC_temper_all.py:55509): Gdk-CRITICAL **: ##:##:##:###: gdk_cursor_new_for_display: assertion 'GDK_IS_DISPLAY (display)' failed

“ ##:##:##:##:##:## :###“是(或看起来像)IP Adresses。 如果没有呼叫set_start_method('Spawn')此错误仅在一开始就显示一次,而当我使用此方法时,它似乎在每次出现result.get( ) ... 最奇怪的是,该代码似乎可以正常工作,不会崩溃,生成我要求的数据文件等...

我认为这应该发布一个新问题,但是如果有人,我仍然把它放在这里有一个快速的答案。 如果没有,我将求助于我的实际代码中存在但不在测试示例中的变量,方法等,以尝试查找错误的来源。我目前最好的猜测是,每个儿童过程所需的记忆空间都使用实际代码所需的记忆空间,由于管理员实施了一些限制,因此遥远的机器无法接受它。

First, I'd like to thank the StackOverflow community for the tremendous help it provided me over the years, without me having to ask a single question.
I could not find anything that I can relate to my problem, though it is probably due to my lack of understanding of the subject, rather than the absence of a response on the website. My apologies in advance if this is a duplicate.

I am relatively new to multiprocess; some time ago I succeeded in using multiprocessing.pools in a very simple way, where I didn't need any feedback between the child processes.
Now I am facing a much more complicated problem, and I am just lost in the documentation about multiprocessing. I hence ask for you help, your kindness and your patience.

I am trying to build a parallel tempering monte-carlo algorithm, from a class.

The basic class very roughly goes as follows:

import numpy as np

class monte_carlo:

    def __init__(self):
        self.x=np.ones((1000,3))
        self.E=np.mean(self.x)
        self.Elist=[]

    def simulation(self,temperature):
        self.T=temperature
        for i in range(3000):
            self.MC_step()
            if i%10==0:
                self.Elist.append(self.E)
        return

    def MC_step(self):
        x=self.x.copy()
        k = np.random.randint(1000)
        x[k] = (x[k] + np.random.uniform(-1,1,3))
        temp_E=np.mean(self.x)
        if np.random.random()<np.exp((self.E-temp_E)/self.T):
            self.E=temp_E
            self.x=x
        return

Obviously, I simplified a great deal (actual class is 500 lines long!), and built fake functions for simplicity: __init__ takes a bunch of parameters as arguments, there are many more lists of measurement else than self.Elist, and also many arrays derived from self.X that I use to compute them. The key point is that each instance of the class contains a lot of informations that I want to keep in memory, and that I don't want to copy over and over again, to avoid dramatic slowing down. Else I would just use the multiprocessing.pool module.

Now, the parallelization I want to do, in pseudo-code:

def proba(dE,pT):
    return np.exp(-dE/pT)  
          
Tlist=[1.1,1.2,1.3]
N=len(Tlist)
G=[]
for _ in range(N):
    G.append(monte_carlo())

for _ in range(5):

    for i in range(N): # this loop should be ran in multiprocess
        G[i].simulation(Tlist[i])
    
    for i in range(N//2): 
        dE=G[i].E-G[i+1].E
        pT=G[i].T + G[i+1].T
        p=proba(dE,pT) # (proba is a function, giving a probability depending on dE)
        if np.random.random() < p: 
             T_temp = G[i].T
             G[i].T = G[i+1].T
             G[i+1].T = T_temp

Synthesis: I want to run several instances of my monte-carlo class in parallel child processes, with different values for a parameter T, then periodically pause everything to change the different T's, and run again the child processes/class instances, from where they paused.
Doing this, I want each class-instance/child-process to stay independent from one another, save its current state with all internal variables while it is paused, and do as few copies as possible. This last point is critical, as the arrays inside the class are quite big (some are 1000x1000), and a copy will therefore very quickly become quite time-costly.

Thanks in advance, and sorry if I am not clear...

Edit:
I am using a distant machine with many (64) CPUs, running on Debian GNU/Linux 10 (buster).

Edit2:
I made a mistake in my original post: in the end, the temperatures must be exchanged between the class-instances, and not inside the global Tlist.

Edit3: Charchit answer works perfectly for the test code, on both my personal machine and the distant machine I am usually using for running my codes. I hence check this as the accepted answer.
However, I want to report here that, inserting the actual, more complicated code, instead of the oversimplified monte_carlo class, the distant machine gives me some strange errors:

Unable to init server: Could not connect: Connection refused

(CMC_temper_all.py:55509): Gtk-WARNING **: ##:##:##:###: Locale not supported by C library.
    Using the fallback 'C' locale.
Unable to init server: Could not connect: Connection refused

(CMC_temper_all.py:55509): Gdk-CRITICAL **: ##:##:##:###: 

gdk_cursor_new_for_display: assertion 'GDK_IS_DISPLAY (display)' failed

(CMC_temper_all.py:55509): Gdk-CRITICAL **: ##:##:##:###: gdk_cursor_new_for_display: assertion 'GDK_IS_DISPLAY (display)' failed

The "##:##:##:###" are (or seems like) IP adresses.
Without the call to set_start_method('spawn') this error shows only once, in the very beginning, while when I use this method, it seems to show at every occurrence of result.get()...
The strangest thing is that the code seems otherwise to work fine, does not crash, produces the datafiles I then ask it to, etc...

I think this would deserve to publish a new question, but I put it here nonetheless in case someone has a quick answer.
If not, I will resort to add one by one the variables, methods, etc... that are present in my actual code but not in the test example, to try and find the origin of the bug. My best guess for now is that the memory space required by each child-process with the actual code, is too large for the distant machine to accept it, due to some restrictions implemented by the admin.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

尬尬 2025-02-17 19:37:30

您正在寻找的是。根据文档,您可以创建共享内存,这对其可以存储的数据有限制,而不是线程安全,但可以提供更好的速度和性能;或者,您可以通过通过管理员。后者是我们要使用的,因为您想共享用户定义的数据类型的整个对象。请记住,使用经理会根据您通过和接收到的参数的复杂性,往返于托管对象的复杂性,会影响代码的速度。

经理,代理和腌制

如前所述,管理人员创建服务器进程以存储对象,并允许通过代理访问它们。我已经回答了一个问题,其中有一个更好的详细信息,以及如何创建合适的代理在这里。我们将使用链接答案中定义的相同代理,并使用一些变体。也就是说,我将__ getAttr __内的工厂功能替换为可以使用Pickle腌制的东西。这意味着您可以运行使用此代理创建的托管对象的实例方法,而无需求助于“ nofollow noreferrer”> supperrocess 。结果是此修改后的代理:

from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result

解决方案

现在我们只需要确保在创建monte_carlo的对象时,我们会使用经理和上述代理进行此操作。为此,我们创建一个名为创建的类构造函数。 monte_carlo的所有对象都应使用此功能创建。因此,最终代码看起来像这样:

from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result


class monte_carlo:

    def __init__(self, ):
        self.x = np.ones((1000, 3))
        self.E = np.mean(self.x)
        self.Elist = []
        self.T = None

    def simulation(self, temperature):
        self.T = temperature
        for i in range(3000):
            self.MC_step()
            if i % 10 == 0:
                self.Elist.append(self.E)
        return

    def MC_step(self):
        x = self.x.copy()
        k = np.random.randint(1000)
        x[k] = (x[k] + np.random.uniform(-1, 1, 3))
        temp_E = np.mean(self.x)
        if np.random.random() < np.exp((self.E - temp_E) / self.T):
            self.E = temp_E
            self.x = x
        return

    @classmethod
    def create(cls, *args, **kwargs):
        # Register class
        class_str = cls.__name__
        BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
        # Start a manager process
        manager = BaseManager()
        manager.start()

        # Create and return this proxy instance. Using this proxy allows sharing of state between processes.
        inst = eval("manager.{}(*args, **kwargs)".format(class_str))
        return inst


def proba(dE,pT):
    return np.exp(-dE/pT)


if __name__ == "__main__":
    Tlist = [1.1, 1.2, 1.3]
    N = len(Tlist)
    G = []

    # Create our managed instances
    for _ in range(N):
        G.append(monte_carlo.create())

    for _ in range(5):

        #  Run simulations in the manager server
        results = []
        with Pool(8) as pool:

            for i in range(N):  # this loop should be ran in multiprocess
                results.append(pool.apply_async(G[i].simulation, (Tlist[i], )))

            # Wait for the simulations to complete
            for result in results:
                result.get()

        for i in range(N // 2):
            dE = G[i].E - G[i + 1].E
            pT = G[i].T + G[i + 1].T
            p = proba(dE, pT)  # (proba is a function, giving a probability depending on dE)
            if np.random.random() < p:
                T_temp = Tlist[i]
                Tlist[i] = Tlist[i + 1]
                Tlist[i + 1] = T_temp

    print(Tlist)

符合您想要的标准。它根本不会创建任何副本,而是,对仿真方法调用的所有参数都在池中序列化并发送到实际存储对象的管理器服务器。它在那里执行,结果(如果有)在主过程中被序列化并返回。所有这些,仅使用内置!

输出

[1.2, 1.1, 1.3]

编辑

由于您正在使用Linux,我鼓励您使用 suptroProcessing.set_start_method 内部,如果__ -name__ ...子句将开始方法设置为“ spawn”。这样做将确保儿童过程无法访问子句中定义的变量。

What you are looking for is sharing state between processes. As per the documentation, you can either create shared memory, which is restrictive about the data it can store and is not thread-safe, but offers better speed and performance; or you can use server processes through managers. The latter is what we are going to use since you want to share whole objects of user-defined datatypes. Keep in mind that using managers will impact speed of your code depending on the complexity of the arguments that you pass and receive, to and from the managed objects.

Managers, proxies and pickling

As mentioned, managers create server processes to store objects, and allow access to them through proxies. I have answered a question with better details on how they work, and how to create a suitable proxy here. We are going to use the same proxy defined in the linked answer, with some variations. Namely, I have replaced the factory functions inside the __getattr__ to something that can be pickled using pickle. This means that you can run instance methods of managed objects created with this proxy without resorting to using multiprocess. The result is this modified proxy:

from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result

Solution

Now we only need to make sure that when we are creating objects of monte_carlo, we do so using managers and the above proxy. For that, we create a class constructor called create. All objects for monte_carlo should be created with this function. With that, the final code looks like this:

from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
import types
import numpy as np


class A:
    def __init__(self, name, method):
        self.name = name
        self.method = method

    def get(self, *args, **kwargs):
        return self.method(self.name, args, kwargs)


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            return A(name, self._callmethod).get
        return result


class monte_carlo:

    def __init__(self, ):
        self.x = np.ones((1000, 3))
        self.E = np.mean(self.x)
        self.Elist = []
        self.T = None

    def simulation(self, temperature):
        self.T = temperature
        for i in range(3000):
            self.MC_step()
            if i % 10 == 0:
                self.Elist.append(self.E)
        return

    def MC_step(self):
        x = self.x.copy()
        k = np.random.randint(1000)
        x[k] = (x[k] + np.random.uniform(-1, 1, 3))
        temp_E = np.mean(self.x)
        if np.random.random() < np.exp((self.E - temp_E) / self.T):
            self.E = temp_E
            self.x = x
        return

    @classmethod
    def create(cls, *args, **kwargs):
        # Register class
        class_str = cls.__name__
        BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))
        # Start a manager process
        manager = BaseManager()
        manager.start()

        # Create and return this proxy instance. Using this proxy allows sharing of state between processes.
        inst = eval("manager.{}(*args, **kwargs)".format(class_str))
        return inst


def proba(dE,pT):
    return np.exp(-dE/pT)


if __name__ == "__main__":
    Tlist = [1.1, 1.2, 1.3]
    N = len(Tlist)
    G = []

    # Create our managed instances
    for _ in range(N):
        G.append(monte_carlo.create())

    for _ in range(5):

        #  Run simulations in the manager server
        results = []
        with Pool(8) as pool:

            for i in range(N):  # this loop should be ran in multiprocess
                results.append(pool.apply_async(G[i].simulation, (Tlist[i], )))

            # Wait for the simulations to complete
            for result in results:
                result.get()

        for i in range(N // 2):
            dE = G[i].E - G[i + 1].E
            pT = G[i].T + G[i + 1].T
            p = proba(dE, pT)  # (proba is a function, giving a probability depending on dE)
            if np.random.random() < p:
                T_temp = Tlist[i]
                Tlist[i] = Tlist[i + 1]
                Tlist[i + 1] = T_temp

    print(Tlist)

This meets the criteria you wanted. It does not create any copies at all, rather, all arguments to the simulation method call are serialized inside the pool and sent to the manager server where the object is actually stored. It gets executed there, and the results (if any) are serialized and returned in the main process. All of this, with only using the builtins!

Output

[1.2, 1.1, 1.3]

Edit

Since you are using Linux, I encourage you to use multiprocessing.set_start_method inside the if __name__ ... clause to set the start method to "spawn". Doing this will ensure that the child processes do not have access to variables defined inside the clause.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文