IPython 中的并行嵌套 for 循环

发布于 2025-01-07 21:11:05 字数 622 浏览 1 评论 0原文

我的 python 代码中有一个嵌套的 for 循环,看起来像这样:

results = []

for azimuth in azimuths:
    for zenith in zeniths:
        # Do various bits of stuff
        # Eventually get a result
        results.append(result)

我想在我的 4 核机器上并行化这个循环以加快速度。查看IPython并行编程文档(http://ipython.org/ipython-doc/dev/parallel/parallel_multiengine.html#quick-and-easy-parallelism)似乎有一种简单的方法来使用map 并行化迭代操作。

但是,要做到这一点,我需要将循环内的代码作为函数(这很容易做到),然后映射该函数。我遇到的问题是我无法获得一个数组来映射该函数。 itertools.product() 生成一个迭代器,我似乎无法使用 map 函数。

我尝试在这里使用地图是不是找错了树?有更好的方法吗?或者是否有某种方法可以使用 itertools.product,然后使用映射到结果的函数进行并行执行?

I have a nested for loop in my python code that looks something like this:

results = []

for azimuth in azimuths:
    for zenith in zeniths:
        # Do various bits of stuff
        # Eventually get a result
        results.append(result)

I'd like to parallelise this loop on my 4 core machine to speed it up. Looking at the IPython parallel programming documentation (http://ipython.org/ipython-doc/dev/parallel/parallel_multiengine.html#quick-and-easy-parallelism) it seems that there is an easy way to use map to parallelise iterative operations.

However, to do that I need to have the code inside the loop as a function (which is easy to do), and then map across this function. The problem I have is that I can't get an array to map this function across. itertools.product() produces an iterator which I can't seem to use the map function with.

Am I barking up the wrong tree by trying to use map here? Is there a better way to do it? Or is there some way to use itertools.product and then do parallel execution with a function mapped across the results?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

北方。的韩爷 2025-01-14 21:11:05

要并行化每个调用,您只需要获取每个参数的列表。您可以使用 itertools.product + zip 来获取:

allzeniths, allazimuths = zip(*itertools.product(zeniths, azimuths))

然后您可以使用 map:

amr = dview.map(f, allzeniths, allazimuths)

要更深入地了解这些步骤,这里有一个示例:

zeniths = range(1,4)
azimuths = range(6,8)

product = list(itertools.product(zeniths, azimuths))
# [(1, 6), (1, 7), (2, 6), (2, 7), (3, 6), (3, 7)]

所以我们有一个“对列表”,但我们真正想要的是每个参数的单个列表,即“列表对”。这正是稍微奇怪的 zip(*product) 语法给我们带来的结果:

allzeniths, allazimuths = zip(*itertools.product(zeniths, azimuths))

print allzeniths
# (1, 1, 2, 2, 3, 3)
print allazimuths
# (6, 7, 6, 7, 6, 7)

现在我们只需将函数映射到这两个列表上,以并行化嵌套的 for 循环:

def f(z,a):
    return z*a

view.map(f, allzeniths, allazimuths)

并且只有两个没有什么特别的 -此方法应该扩展到任意数量的嵌套循环。

To parallelize every call, you just need to get a list for each argument. You can use itertools.product + zip to get this:

allzeniths, allazimuths = zip(*itertools.product(zeniths, azimuths))

Then you can use map:

amr = dview.map(f, allzeniths, allazimuths)

To go a bit deeper into the steps, here's an example:

zeniths = range(1,4)
azimuths = range(6,8)

product = list(itertools.product(zeniths, azimuths))
# [(1, 6), (1, 7), (2, 6), (2, 7), (3, 6), (3, 7)]

So we have a "list of pairs", but what we really want is a single list for each argument, i.e. a "pair of lists". This is exactly what the slightly weird zip(*product) syntax gets us:

allzeniths, allazimuths = zip(*itertools.product(zeniths, azimuths))

print allzeniths
# (1, 1, 2, 2, 3, 3)
print allazimuths
# (6, 7, 6, 7, 6, 7)

Now we just map our function onto those two lists, to parallelize nested for loops:

def f(z,a):
    return z*a

view.map(f, allzeniths, allazimuths)

And there's nothing special about there being only two - this method should extend to an arbitrary number of nested loops.

2025-01-14 21:11:05

我假设您使用的是 IPython 0.11 或更高版本。首先定义一个简单的函数。

def foo(azimuth, zenith):
    # Do various bits of stuff
    # Eventually get a result
    return result

然后使用 IPython 的精细并行套件来并行化您的问题。首先通过在终端窗口中启动集群来启动连接有 5 个引擎 (#CPUs + 1) 的控制器(如果您安装了 IPython 0.11 或更高版本,则应该存在此程序):

ipcluster start -n 5

在您的脚本中连接到控制器并传输所有任务。控制器会处理一切。

from IPython.parallel import Client

c = Client()   # here is where the client establishes the connection
lv = c.load_balanced_view()   # this object represents the engines (workers)

tasks = []
for azimuth in azimuths:
    for zenith in zeniths:
        tasks.append(lv.apply(foo, azimuth, zenith))

result = [task.get() for task in tasks]  # blocks until all results are back

I assume you are using IPython 0.11 or later. First of all define a simple function.

def foo(azimuth, zenith):
    # Do various bits of stuff
    # Eventually get a result
    return result

then use IPython's fine parallel suite to parallelize your problem. first start a controller with 5 engines attached (#CPUs + 1) by starting a cluster in a terminal window (if you installed IPython 0.11 or later this program should be present):

ipcluster start -n 5

In your script connect to the controller and transmit all your tasks. The controller will take care of everything.

from IPython.parallel import Client

c = Client()   # here is where the client establishes the connection
lv = c.load_balanced_view()   # this object represents the engines (workers)

tasks = []
for azimuth in azimuths:
    for zenith in zeniths:
        tasks.append(lv.apply(foo, azimuth, zenith))

result = [task.get() for task in tasks]  # blocks until all results are back
jJeQQOZ5 2025-01-14 21:11:05

我不太熟悉 IPython,但一个简单的解决方案似乎是仅并行化外循环。

def f(azimuth):
    results = []
    for zenith in zeniths:
        #compute result
        results.append(result)
    return results

allresults = map(f, azimuths)

I'm not really familiar with IPython, but an easy solution would seem to be to parallelize the outer loop only.

def f(azimuth):
    results = []
    for zenith in zeniths:
        #compute result
        results.append(result)
    return results

allresults = map(f, azimuths)
我的黑色迷你裙 2025-01-14 21:11:05

如果您确实想并行运行代码,请使用concurrent.futures

import itertools
import concurrent.futures

def _work_horse(azimuth, zenith):
    #DO HEAVY WORK HERE
    return result

futures = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    for arg_set in itertools.product(zeniths, azimuths):
        futures.append(executor.submit(_work_horse, *arg_set))
executor.shutdown(wait=True)

# Will time out after one hour.
results = [future.result(3600) for future in futures]

If you actually want to run your code in parallel, use concurrent.futures

import itertools
import concurrent.futures

def _work_horse(azimuth, zenith):
    #DO HEAVY WORK HERE
    return result

futures = []
with concurrent.futures.ProcessPoolExecutor() as executor:
    for arg_set in itertools.product(zeniths, azimuths):
        futures.append(executor.submit(_work_horse, *arg_set))
executor.shutdown(wait=True)

# Will time out after one hour.
results = [future.result(3600) for future in futures]
梦屿孤独相伴 2025-01-14 21:11:05

如果您想保留循环的结构,可以尝试使用 Ray (docs) ,这是一个用于编写并行和分布式Python的框架。一个要求是您必须将可以并行化的工作分离到其自己的函数中。

您可以像这样导入 Ray:

import ray

# Start Ray. This creates some processes that can do work in parallel.
ray.init()

然后,您的脚本将如下所示:

# Add this line to signify that the function can be run in parallel (as a
# "task"). Ray will load-balance different `work` tasks automatically.
@ray.remote
def work(azimuth, zenith):
  # Do various bits of stuff
  # Eventually get a result
  return result

results = []

for azimuth in azimuths:
    for zenith in zeniths:
        # Store a future, which represents the future result of `work`.
        results.append(work.remote(azimuth, zenith))

# Block until the results are ready with `ray.get`.
results = ray.get(results)

If you want to keep the structure of your loop, you can try using Ray (docs), which is a framework for writing parallel and distributed Python. The one requirement is that you would have to separate out the work that can be parallelized into its own function.

You can import Ray like this:

import ray

# Start Ray. This creates some processes that can do work in parallel.
ray.init()

Then, your script would look like this:

# Add this line to signify that the function can be run in parallel (as a
# "task"). Ray will load-balance different `work` tasks automatically.
@ray.remote
def work(azimuth, zenith):
  # Do various bits of stuff
  # Eventually get a result
  return result

results = []

for azimuth in azimuths:
    for zenith in zeniths:
        # Store a future, which represents the future result of `work`.
        results.append(work.remote(azimuth, zenith))

# Block until the results are ready with `ray.get`.
results = ray.get(results)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文