返回介绍

Using IPython parallel for interactive parallel computing

发布于 2025-02-25 23:44:04 字数 5717 浏览 0 评论 0 收藏 0

Start a cluster of workers using IPython notebook interface. Alternatively, enter

ipcluster start -n 4

at the command line.

from IPython.parallel import Client, interactive

Direct view

rc = Client()
print rc.ids
dv = rc[:]
[0, 1, 2, 3]

When a cell is marked with %%px, all commands in that cell get executed on all engines simultaneously. We’ll use it to load numpy on all engines.

%px import numpy as np

We can refer to indivudal engines using indexing and slice notation on the client - for example, to set random seeds.

for i, r in enumerate(rc):
    r.execute('np.random.seed(123)')
%%px

np.random.random(3)
Out[0:2]: array([ 0.69646919,  0.28613933,  0.22685145])
Out[1:2]: array([ 0.69646919,  0.28613933,  0.22685145])
Out[2:2]: array([ 0.69646919,  0.28613933,  0.22685145])
Out[3:2]: array([ 0.69646919,  0.28613933,  0.22685145])

Another way to do this is via the scatter operation.

dv.scatter('seed', [1,1,2,2], block=True)
dv['seed']
[[1], [1], [2], [2]]
%%px

np.random.seed(seed)
np.random.random(3)
Out[0:3]: array([ 0.13436424,  0.84743374,  0.76377462])
Out[1:3]: array([ 0.13436424,  0.84743374,  0.76377462])
Out[2:3]: array([ 0.95603427,  0.94782749,  0.05655137])
Out[3:3]: array([ 0.95603427,  0.94782749,  0.05655137])

We set them to differnet seeds again to do the Monte Carlo integration.

for i, r in enumerate(rc):
    r.execute('np.random.seed(%d)' % i)
%%px

np.random.random(3)
Out[0:4]: array([ 0.5488135 ,  0.71518937,  0.60276338])
Out[1:4]: array([  4.17022005e-01,   7.20324493e-01,   1.14374817e-04])
Out[2:4]: array([ 0.4359949 ,  0.02592623,  0.54966248])
Out[3:4]: array([ 0.5507979 ,  0.70814782,  0.29090474])

We can collect the individual results of remote computation using a dictionary lookup syntax or use gather to concatenate the resutls.

%%px

x = np.random.random(3)
dv['x']
[array([ 0.5449,  0.4237,  0.6459]),
 array([ 0.3023,  0.1468,  0.0923]),
 array([ 0.4353,  0.4204,  0.3303]),
 array([ 0.5108,  0.8929,  0.8963])]
dv.gather('x', block=True)
array([ 0.5449,  0.4237,  0.6459,  0.3023,  0.1468,  0.0923,  0.4353,
        0.4204,  0.3303,  0.5108,  0.8929,  0.8963])

Finding \(\pi\) simply involves generating random uniforms on each processor.

%%px
n = 1e7
x = np.random.uniform(-1, 1, (n, 2))
n = (x[:, 0]**2 + x[:,1]**2 < 1).sum()
%precision 8
ns = dv['n']
4*np.sum(ns)/(1e7*len(rc))
3.14143780

In blocking mode (the default), operations on remote engines do not return until all compuations are done. For long computations, this may be undesirable and we can ask the engine to return immeidately by using a non-blocking operation. In this case, what is returned is an Async type object, which we can query for whether the computation is complete and if so, retrieve data from it.

dv.scatter('s', np.arange(16), block=False)
<AsyncResult: scatter>
dv['s']
[array([0, 1, 2, 3]),
 array([4, 5, 6, 7]),
 array([ 8,  9, 10, 11]),
 array([12, 13, 14, 15])]
dv.gather('s')
<AsyncMapResult: gather>
dv.gather('s').get()
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15])
ar = dv.map_async(lambda x: x+1, range(10))
ar.ready()
False
ar.ready()
False
ar.get()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Sometimes the tasks are very unbalanced - some may complete in a short time, while others ay take a long time. In this case, using a load_balanced view is more efficient to avoid the risk that a single engine gets allocated all the long-running tasks.

lv = rc.load_balanced_view()
def wait(n):
    import time
    time.sleep(n)
    return n

dv['wait'] = wait
intervals = [5,1,1,1,1,1,1,1,1,1,1,1,1,5,5,5]
%%time

ar = dv.map(wait, intervals)
ar.get()
CPU times: user 2.75 s, sys: 723 ms, total: 3.47 s
Wall time: 16 s
%%time

ar = lv.map(wait, intervals, balanced=True)
ar.get()
CPU times: user 1.7 s, sys: 459 ms, total: 2.16 s
Wall time: 9.1 s

We need to %load_ext cythonmagic in every engine, and compile the cython extension in every engine. the simplest way is to do all this in a %%px cell.

%%px
def python_loop(xs):
    s = 0.0
    for i in range(len(xs)):
        s += xs[i]
    return s
%%px
%load_ext cythonmagic
%%px
%%cython

def cython_loop(double[::1] xs):
    n = xs.shape[0]
    cdef int i
    cdef double s = 0.0
    for i in range(n):
        s += xs[i]
    return s
%%time
%%px
xs = np.random.random(1e7)
s = python_loop(xs)
CPU times: user 900 ms, sys: 195 ms, total: 1.1 s
Wall time: 9.12 s
dv['s']
[4999255.51979800, 5001207.17286485, 5000816.40605527, 4999437.17107215]
%%time
%%px
xs = np.random.random(1e7)
s = cython_loop(xs)
CPU times: user 37.3 ms, sys: 7.5 ms, total: 44.8 ms
Wall time: 376 ms
dv['s']
[5000927.33063748, 4999180.32360687, 5000671.20938849, 4999140.47559244]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文