为什么在使用共享 numpy 数据的 for 循环内使用 python 多处理来解决令人尴尬的并行问题时没有加速?

发布于 2024-10-06 20:07:27 字数 3784 浏览 3 评论 0原文

我想加速一个与贝叶斯推理相关的令人尴尬的并行问题。目的是在给定矩阵 A 的情况下推断一组图像 x 的系数 u,使得 X = A*U。 X 的尺寸为 mxn、A mxp 和 U pxn。对于X的每一列,必须推断出系数U的最佳对应列。最后,该信息用于更新A。我使用m = 3000,p = 1500和n = 100。 因此,由于它是一个线性模型,系数矩阵 u 的推断由 n 个独立的计算组成。因此,我尝试使用Python的多处理模块,但没有加速。

这是我所做的:

主要结构(没有并行化)是:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

多处理的实现:

我尝试实现多处理。我有一台8核的机器可以用。

  1. 有 3 个 for 循环。唯一看起来“可并行”的是第三个,其中系数是推断出来的:
    • 生成一个队列并将迭代次数从 0 到 batch_size-1 放入队列中
    • 生成8个进程,并让它们通过队列工作
  2. 使用 multiprocessing.Array 共享数据 U

因此,我将第三个循环替换为以下:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

这是类 Wrap_mp:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

这是函数 infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

现在的问题如下:

  1. 多处理版本并不比给定数据维度的单线程版本。
  2. 进程 ID 随着每次迭代而增加。这是否意味着不断有新的进程产生?这不是会产生巨大的开销吗?我怎样才能避免这种情况?是否有可能在整个 for 循环中创建 8 个不同的进程并用数据更新它们?
  3. 我在进程之间共享系数 U 的方式是否会减慢计算速度?还有另一种更好的方法吗?
  4. 进程池会更好吗?

我真的很感谢任何形式的帮助!我一个月前开始使用 Python,现在很迷茫。

恩金

I want to speed up an embarassingly parallel problem related to Bayesian Inference. The aim is to infer coefficents u for a set of images x, given a matrix A, such that X = A*U.
X has dimensions mxn, A mxp and U pxn. For each column of X, one has to infer the optimal corresponding column of the coefficients U. In the end, this information is used to update A. I use m = 3000, p = 1500 and n = 100.
So, as it is a linear model, the inference of the coefficient-matrix u consists of n independent calculations. Thus, I tried to work with the multiprocessing module of Python, but there is no speed up.

Here is what I did:

The main structure, without parallelization, is:

import numpy as np
from convex import Crwlasso_cd

S = np.empty((m, batch_size))

for t in xrange(start_iter, niter):

    ## Begin Warm Start ##
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf.
    for ws in range(5):
        # Initialize the coefficients
        if ws:
            theta = U
        else:
            theta = np.dot(A.T, X)

        # Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
        # Crwlasso_cd is the function that does the inference per data sample
        # It's basically a C-inline code
        for k in range(batch_size):
            U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())

        # Given the inferred coefficients, update and renormalize
        # the basis functions A 
        dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
        A += (eta / batch_size) * dA1
        A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))

Implementation of multiprocessing:

I tried to implement multiprocessing. I have an 8-core machine that I can use.

  1. There are 3 for-loops. The only one that seems to be "parallelizable" is the third one, where the coefficients are inferred:
    • Generate a Queue and stack the iteration-numbers from 0 to batch_size-1 into the Queue
    • Generate 8 processes, and let them work through the Queue
  2. Share the data U using multiprocessing.Array

So, I replaced this third loop with the following:

from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty

num_cpu = mp.cpu_count()
work_queue = Queue()

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)

...

        # Within the for-loops:
        for p in xrange(batch_size):
        work_queue.put(p)

        processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]

        for p in processes:
            p.start()
            print p.pid
        for p in processes:
            p.join()

Here is the class Wrap_mp:

class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
    processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""

    def __init__(self, arr):
        """ Initialize a shared array from a numpy array.

            The data is copied.
        """
        self.data = ndarray_to_shmem(arr)
        self.dtype = arr.dtype
        self.shape = arr.shape

    def __array__(self):
        """ Implement the array protocole.
        """
        arr = shmem_as_ndarray(self.data, dtype=self.dtype)
        arr.shape = self.shape
        return arr

    def asarray(self):
        return self.__array__()

And here is the function infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X):

    while True:
        try:
            index = work_queue.get(block=False)
            x = X[:,index]
            U = U_mp.asarray()
            theta = np.dot(phit,x)

            # Infer the coefficients of the column index
            U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())

         except Empty:
            break

The problem now are the following:

  1. The multiprocessing version is not faster than the single thread version for the given dimensions of the data.
  2. The process ID's increase with every iteration. Does this mean that there is constantly a new process generated? Doesn't this generate a huge overhead? How can I avoid that? Is there a possibility of creating within the whole for-loop 8 different processes and just update them with the data?
  3. Does the way I share the coefficients U amongst the processes slow the calculation down? Is there another, better way of doing this?
  4. Would a Pool of processes be better?

I am really thankful for any sort of help! I have started working with Python a month ago, and am pretty lost now.

Engin

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

长发绾君心 2024-10-13 20:07:27

每次创建流程时,您都在创建一个新流程。如果您在 for 循环中执行此操作,那么是的,您每次通过循环都会启动新进程。听起来您想要做的是在循环外初始化队列和进程,然后填充循环内的队列。

我之前使用过 multiprocessing.Pool,它很有用,但它并没有提供比您已经使用队列实现的功能更多的功能。

Every time you create a Process you are creating a new process. If you're doing that within your for loop, then yes, you are starting new processes every time through the loop. It sounds like what you want to do is initialize your Queue and Processes outside of the loop, then fill the Queue inside the loop.

I've used multiprocessing.Pool before, and it's useful, but it doesn't offer much over what you've already implemented with a Queue.

无可置疑 2024-10-13 20:07:27

最终,这一切都归结为一个问题:是否可以在主 for 循环之外启动进程,并且对于每次迭代,将更新的变量输入其中,让它们处理数据,并从所有循环中收集新计算的数据流程,而不必每次迭代都启动新流程?

Eventually, this all boils down to one question: Is it possible to start processes outside of the main for-loop, and for every iteration, feed the updated variables in them, have them processing the data, and collecting the newly calculated data from all of the processes, without having to start new processes every iteration?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文