我正在使用多处理池来updte矩阵值,但是值不更改

发布于 2025-01-21 08:43:29 字数 1609 浏览 1 评论 0原文

我有一个简单的函数,该功能采用矩阵“ H”和更多参数,并在矩阵的单列顶部添加了一些计算的向量。然后,我将该函数应用于矩阵的每一列 - 我有一个代码,该代码依次执行此功能,那里的一切都很好。但是,由于列的操作是独立的,所以我想并行进行。但是,当我将相同的函数应用于“ yuptrocessing.pool()”时,矩阵的值不会从初始值中变化。

Bellow使用顺序和并行实现的脚本。最后,矩阵“ H1”和“ H2”的值应该是相同的,但不是,实际上'H2'的值与开始时具有相同的值(也就是说,作为矩阵“ deltas” )。

我不是程序员,也没有多处处理库的经验,所以也许我在这里做一些愚蠢的事情...

from  multiprocessing import Pool
from multiprocessing import set_start_method
import time
import numpy as np
from functools import partial

def h_single_ctr(ctr,C,keys1,bs1,h):
    indices1 = np.where(keys1[:,1]==ctr)[0]
    indices2 = np.where(keys1[:,0]==ctr)[0]
    h[:,ctr] += (C[keys1[:,0][indices1]]).dot(bs1[indices1])
    h[:,ctr] += (C[keys1[:,1][indices2]]).dot(bs1[indices2])

if __name__ == '__main__':
    m,n = 100,15000
    deltas = np.random.rand(n,m)
    C = np.random.rand(m)
    mbs = 150
    bs1 = np.random.rand(mbs,n)
    keys1 = np.random.randint(m,size=(mbs,2))
        
    # Sequential    
    tic = time.time()
    h1 = 0. + deltas
    for ctr in range(m):
        # Update each column of a matrix h1, using function h_single_ctr
        h_single_ctr(ctr,C,keys1,bs1,h1)
    toc = time.time()    
    print('Done in {:.4f} seconds'.format(toc-tic))

    # Multiprocessing / Pool
    tic = time.time()
    h2 = 0. + deltas
    p = Pool(5)
    # Update each column of a matrix h2, using function h_single_ctr, in parallel
    p.map(partial(h_single_ctr,C=C,keys1=keys1,bs1=bs1,h=h2), range(m))
    p.close()
    p.join()
    toc = time.time()    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    print(np.linalg.norm(h1-h2))

I have a simple function that takes a matrix 'h' and some more arguments, and adds some computed vector on top of a single column of the matrix. Then I apply that function for each column of a matrix - I have a code that does this sequentially and everything is fine there; but, since the column-wise operations are independent I want to do it in parallel. However, when I apply the same function with 'multiprocessing.Pool()', the values of the matrix don't change from the initial value.

Bellow goes a script with both sequential and parallel implementation. In the end, the values of matrices 'h1' and 'h2' should be the same, but they are not, and actually 'h2' has the same value that it had in the beginning (that is, as a matrix 'deltas').

I am not a programmer, and don't have much experience with multiprocessing library, so maybe I am doing something stupid here...

from  multiprocessing import Pool
from multiprocessing import set_start_method
import time
import numpy as np
from functools import partial

def h_single_ctr(ctr,C,keys1,bs1,h):
    indices1 = np.where(keys1[:,1]==ctr)[0]
    indices2 = np.where(keys1[:,0]==ctr)[0]
    h[:,ctr] += (C[keys1[:,0][indices1]]).dot(bs1[indices1])
    h[:,ctr] += (C[keys1[:,1][indices2]]).dot(bs1[indices2])

if __name__ == '__main__':
    m,n = 100,15000
    deltas = np.random.rand(n,m)
    C = np.random.rand(m)
    mbs = 150
    bs1 = np.random.rand(mbs,n)
    keys1 = np.random.randint(m,size=(mbs,2))
        
    # Sequential    
    tic = time.time()
    h1 = 0. + deltas
    for ctr in range(m):
        # Update each column of a matrix h1, using function h_single_ctr
        h_single_ctr(ctr,C,keys1,bs1,h1)
    toc = time.time()    
    print('Done in {:.4f} seconds'.format(toc-tic))

    # Multiprocessing / Pool
    tic = time.time()
    h2 = 0. + deltas
    p = Pool(5)
    # Update each column of a matrix h2, using function h_single_ctr, in parallel
    p.map(partial(h_single_ctr,C=C,keys1=keys1,bs1=bs1,h=h2), range(m))
    p.close()
    p.join()
    toc = time.time()    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    print(np.linalg.norm(h1-h2))

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

烟─花易冷 2025-01-28 08:43:29

您可以为每个过程创建一个新变量,然后将它们全部添加到全局h之上。请注意,您需要具有一维数组,而不是该过程中的矩阵。

from multiprocessing import Process, Array
from threading import Thread
import time
import numpy as np

def h_single_ctr(ctr,C,keys1,bs1,h):
    indices1 = np.where(keys1[:,1]==ctr)[0]
    indices2 = np.where(keys1[:,0]==ctr)[0]
    h[:,ctr] += (C[keys1[:,0][indices1]]).dot(bs1[indices1])
    h[:,ctr] += (C[keys1[:,1][indices2]]).dot(bs1[indices2])
    
def h_single_ctr2(ctr,C,keys1,bs1):
    indices1 = np.where(keys1[:,1]==ctr)[0]
    indices2 = np.where(keys1[:,0]==ctr)[0]
    res = (C[keys1[:,0][indices1]]).dot(bs1[indices1])
    res += (C[keys1[:,1][indices2]]).dot(bs1[indices2])
    return res
def h_multiple_ctr(n,C,ctr_list,keys1,bs1,h):
    for i in range(len(ctr_list)):
        ctr = ctr_list[i]
        res = h_single_ctr2(ctr,C,keys1,bs1)
        h[(i*n):((i+1)*n)] += res

if __name__ == '__main__':
    m,n = 100,15000
    deltas = np.random.rand(n,m)
    C = np.random.rand(m)
    mbs = 150
    bs1 = np.random.rand(mbs,n)
    keys1 = np.random.randint(m,size=(mbs,2))
    
    num_processes = 3
    col_processes = [[i for i in range(j,m,num_processes)] for j in range(num_processes)] # column indices that each process will take

    # Sequential    
    tic = time.time()
    h1 = 0. + deltas
    for ctr in range(m):
        # Update each column of a matrix h1, using function h_single_ctr
        h_single_ctr(ctr,C,keys1,bs1,h1)
    toc = time.time()    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    # Multiple processes
    tic = time.time()
    h2 = 0. + deltas
    for pid in range(num_processes):
        hi = Array('d', [0]*(len(col_processes[pid])*n), lock=False)
        p = Process(target=h_multiple_ctr, args=(n,C,col_processes[pid],keys1,bs1,hi))
        p.start()
        p.join()
        h2[:,col_processes[pid]] += np.reshape(hi,(len(col_processes[pid]),n)).T
        
    toc = time.time()    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    print(np.linalg.norm(h1-h2))

You can create a new variable for each process and then add them all on top of your global h. Notice that you need to have a one-dimensional array, and not a matrix within the process.

from multiprocessing import Process, Array
from threading import Thread
import time
import numpy as np

def h_single_ctr(ctr,C,keys1,bs1,h):
    indices1 = np.where(keys1[:,1]==ctr)[0]
    indices2 = np.where(keys1[:,0]==ctr)[0]
    h[:,ctr] += (C[keys1[:,0][indices1]]).dot(bs1[indices1])
    h[:,ctr] += (C[keys1[:,1][indices2]]).dot(bs1[indices2])
    
def h_single_ctr2(ctr,C,keys1,bs1):
    indices1 = np.where(keys1[:,1]==ctr)[0]
    indices2 = np.where(keys1[:,0]==ctr)[0]
    res = (C[keys1[:,0][indices1]]).dot(bs1[indices1])
    res += (C[keys1[:,1][indices2]]).dot(bs1[indices2])
    return res
def h_multiple_ctr(n,C,ctr_list,keys1,bs1,h):
    for i in range(len(ctr_list)):
        ctr = ctr_list[i]
        res = h_single_ctr2(ctr,C,keys1,bs1)
        h[(i*n):((i+1)*n)] += res

if __name__ == '__main__':
    m,n = 100,15000
    deltas = np.random.rand(n,m)
    C = np.random.rand(m)
    mbs = 150
    bs1 = np.random.rand(mbs,n)
    keys1 = np.random.randint(m,size=(mbs,2))
    
    num_processes = 3
    col_processes = [[i for i in range(j,m,num_processes)] for j in range(num_processes)] # column indices that each process will take

    # Sequential    
    tic = time.time()
    h1 = 0. + deltas
    for ctr in range(m):
        # Update each column of a matrix h1, using function h_single_ctr
        h_single_ctr(ctr,C,keys1,bs1,h1)
    toc = time.time()    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    # Multiple processes
    tic = time.time()
    h2 = 0. + deltas
    for pid in range(num_processes):
        hi = Array('d', [0]*(len(col_processes[pid])*n), lock=False)
        p = Process(target=h_multiple_ctr, args=(n,C,col_processes[pid],keys1,bs1,hi))
        p.start()
        p.join()
        h2[:,col_processes[pid]] += np.reshape(hi,(len(col_processes[pid]),n)).T
        
    toc = time.time()    
    print('Done in {:.4f} seconds'.format(toc-tic))
    
    print(np.linalg.norm(h1-h2))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文