如何在 Python 中并行化列表理解计算?

发布于 2024-10-21 01:15:31 字数 270 浏览 4 评论 0原文

列表推导式和映射计算都应该——至少在理论上——相对容易并行化:列表推导式中的每个计算都可以独立于所有其他元素的计算来完成。例如,在表达式中,

[ x*x for x in range(1000) ]

每个 x*x 计算可以(至少在理论上)并行完成。

我的问题是:是否有任何 Python 模块/Python 实现/Python 编程技巧来并行化列表理解计算(以便使用所有 16 / 32 / ... 核心或将计算分布在计算机网格上或在云上)?

Both list comprehensions and map-calculations should -- at least in theory -- be relatively easy to parallelize: each calculation inside a list-comprehension could be done independent of the calculation of all the other elements. For example in the expression

[ x*x for x in range(1000) ]

each x*x-Calculation could (at least in theory) be done in parallel.

My question is: Is there any Python-Module / Python-Implementation / Python Programming-Trick to parallelize a list-comprehension calculation (in order to use all 16 / 32 / ... cores or distribute the calculation over a Computer-Grid or over a Cloud)?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

醉城メ夜风 2024-10-28 01:15:31

正如 Ken 所说,它不能,但是使用 2.6 的 multiprocessing 模块,它很漂亮易于并行计算。

import multiprocessing

try:
    cpus = multiprocessing.cpu_count()
except NotImplementedError:
    cpus = 2   # arbitrary default


def square(n):
    return n * n

pool = multiprocessing.Pool(processes=cpus)
print(pool.map(square, range(1000)))

文档中还有一些示例,展示了如何使用管理器执行此操作,这也应该允许分布式计算。

As Ken said, it can't, but with 2.6's multiprocessing module, it's pretty easy to parallelize computations.

import multiprocessing

try:
    cpus = multiprocessing.cpu_count()
except NotImplementedError:
    cpus = 2   # arbitrary default


def square(n):
    return n * n

pool = multiprocessing.Pool(processes=cpus)
print(pool.map(square, range(1000)))

There are also examples in the documentation that show how to do this using Managers, which should allow for distributed computations as well.

失去的东西太少 2024-10-28 01:15:31

对于共享内存并行,我推荐 joblib

from joblib import delayed, Parallel

def square(x): return x*x
values = Parallel(n_jobs=NUM_CPUS)(delayed(square)(x) for x in range(1000))

For shared-memory parallelism, I recommend joblib:

from joblib import delayed, Parallel

def square(x): return x*x
values = Parallel(n_jobs=NUM_CPUS)(delayed(square)(x) for x in range(1000))
丘比特射中我 2024-10-28 01:15:31

关于列表理解的自动并行化

,恕我直言,如果没有附加信息(例如使用 OpenMP 中的指令提供的信息),或者将其限制为仅涉及内置类型的表达式,列表理解的自动并行化将不可能有效 /方法。

除非保证对每个列表项进行的处理没有副作用,否则如果不按顺序进行,结果可能会无效(或至少不同)。

# Artificial example
counter = 0

def g(x): # func with side-effect
    global counter
    counter = counter + 1
    return x + counter

vals = [g(i) for i in range(100)] # diff result when not done in order

还有任务分配的问题。问题空间应该如何分解?

如果每个元素的处理形成一个任务(〜任务场),那么当有许多元素每个都涉及琐碎计算时,管理任务的开销将淹没并行化的性能收益。

人们还可以采用数据分解方法,其中问题空间在可用进程之间平均划分。

事实上,列表理解也适用于生成器,这使得这有点棘手,但是如果预迭代的开销是可以接受的,这可能不是一个阻碍。当然,生成器也有可能产生副作用,如果过早地迭代后续项,则可能会改变结果。可能性很小,但有可能。

更大的问题是进程之间的负载不平衡。无法保证每个元素都会花费相同的时间来处理,因此静态分区的数据可能会导致一个进程完成大部分工作,而您的时间则被闲置。

将列表分解为更小的块并在每个子进程可用时处理它们是一个很好的折衷方案,但是,块大小的良好选择将取决于应用程序,因此如果没有用户的更多信息,则无法实现。

替代方案

正如其他几个答案中提到的,有许多方法和并行计算模块/框架可供选择取决于一项要求。

我只使用过 MPI(C 语言),没有使用 Python 进行并行处理的经验,因此我无法保证任何(尽管快速浏览后,
多处理jugpppyro 脱颖而出)。

如果要求尽可能接近列表理解,那么 jug 似乎是最接近的匹配。从 教程 中,跨多个实例分发任务可以非常简单

from jug.task import Task
from yourmodule import process_data
tasks = [Task(process_data,infile) for infile in glob('*.dat')]

:执行类似于 multiprocessing.Pool.map() 的操作,jug 可以使用不同的后端来同步进程并存储中间结果(redis、文件系统、内存中),这意味着进程可以跨越集群中的节点。

On automatical parallelisation of list comprehension

IMHO, effective automatic parallisation of list comprehension would be impossible without additional information (such as those provided using directives in OpenMP), or limiting it to expressions that involve only built-in types/methods.

Unless there is a guarantee that the processing done on each list item has no side effects, there is a possibility that the results will be invalid (or at least different) if done out of order.

# Artificial example
counter = 0

def g(x): # func with side-effect
    global counter
    counter = counter + 1
    return x + counter

vals = [g(i) for i in range(100)] # diff result when not done in order

There is also the issue of task distribution. How should the problem space be decomposed?

If the processing of each element forms a task (~ task farm), then when there are many elements each involving trivial calculation, the overheads of managing the tasks will swamps out the performance gains of parallelisation.

One could also take the data decomposition approach where the problem space is divided equally among the available processes.

The fact that list comprehension also works with generators makes this slightly tricky, however this is probably not a show stopper if the overheads of pre-iterating it is acceptable. Of course, there is also a possibility of generators with side-effects which can change the outcome if subsequent items are prematurely iterated. Very unlikely, but possible.

A bigger concern would be load imbalance across processes. There is no guarantee that each element would take the same amount of time to process, so statically partitioned data may result in one process doing most of the work while the idle your time away.

Breaking the list down to smaller chunks and handing them as each child process is available is a good compromise, however, a good selection of chunk size would be application dependent hence not doable without more information from the user.

Alternatives

As mentioned in several other answers, there are many approaches and parallel computing modules/frameworks to choose from depending on one requirements.

Having used only MPI (in C) with no experience using Python for parallel processing, I am not in a position to vouch for any (although, upon a quick scan through,
multiprocessing, jug, pp and pyro stand out).

If a requirement is to stick as close as possible to list comprehension, then jug seems to be the closest match. From the tutorial, distributing tasks across multiple instances can be as simple as:

from jug.task import Task
from yourmodule import process_data
tasks = [Task(process_data,infile) for infile in glob('*.dat')]

While that does something similar to multiprocessing.Pool.map(), jug can use different backends for synchronising process and storing intermediate results (redis, filesystem, in-memory) which means the processes can span across nodes in a cluster.

作死小能手 2024-10-28 01:15:31

正如上面的答案所指出的,这实际上很难自动完成。那么我认为问题实际上是如何以最简单的方式做到这一点。理想情况下,解决方案不需要您知道“我有多少个核心”之类的信息。您可能想要的另一个属性是仍然能够在单个可读行中进行列表理解。

一些给定的答案似乎已经具有像这样的良好属性,但另一种选择是 Ray (docs),这是一个编写并行Python的框架。在 Ray 中,你可以这样做:

import ray

# Start Ray. This creates some processes that can do work in parallel.
ray.init()

# Add this line to signify that the function can be run in parallel (as a
# "task"). Ray will load-balance different `square` tasks automatically.
@ray.remote
def square(x):
    return x * x

# Create some parallel work using a list comprehension, then block until the
# results are ready with `ray.get`.
ray.get([square.remote(x) for x in range(1000)])

As the above answers point out, this is actually pretty hard to do automatically. Then I think the question is actually how to do it in the easiest way possible. Ideally, a solution wouldn't require you to know things like "how many cores do I have". Another property that you might want is to be able to still do the list comprehension in a single readable line.

Some of the given answers already seem to have nice properties like this, but another alternative is Ray (docs), which is a framework for writing parallel Python. In Ray, you would do it like this:

import ray

# Start Ray. This creates some processes that can do work in parallel.
ray.init()

# Add this line to signify that the function can be run in parallel (as a
# "task"). Ray will load-balance different `square` tasks automatically.
@ray.remote
def square(x):
    return x * x

# Create some parallel work using a list comprehension, then block until the
# results are ready with `ray.get`.
ray.get([square.remote(x) for x in range(1000)])
内心荒芜 2024-10-28 01:15:31

使用 futures.{Thread,Process}PoolExecutor.map(func, *iterables, timeout=None) 和 futures.as_completed(future_instances, timeout=None) 函数新的 3.2 concurrent.futures 包可能会有所帮助。

它还可以作为 2.6+ 向后移植 提供。

Using the futures.{Thread,Process}PoolExecutor.map(func, *iterables, timeout=None) and futures.as_completed(future_instances, timeout=None) functions from the new 3.2 concurrent.futures package could help.

It's also available as a 2.6+ backport.

痞味浪人 2024-10-28 01:15:31

不,因为列表理解本身就是一种 C 优化宏。如果你把它拉出来并并行化它,那么它就不是一个列表理解,它只是一个很好的老式 MapReduce。

但您可以轻松地并行化您的示例。这里有一个关于将 MapReduce 与 Python 并行化库结合使用的很好的教程:

http: //mikecvet.wordpress.com/2010/07/02/parallel-mapreduce-in-python/

No, because list comprehension itself is a sort of a C-optimized macro. If you pull it out and parallelize it, then it's not a list comprehension, it's just a good old fashioned MapReduce.

But you can easily parallelize your example. Here's a good tutorial on using MapReduce with Python's parallelization library:

http://mikecvet.wordpress.com/2010/07/02/parallel-mapreduce-in-python/

浪漫人生路 2024-10-28 01:15:31

您可以使用asyncio。 (可以在[此处][1]找到文档)。它被用作多个 Python 异步框架的基础,这些框架提供高性能网络和 Web 服务器、数据库连接库、分布式任务队列等。此外,它还具有高级和低级 API 来解决任何类型的问题。

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

现在,只要调用该函数,就会并行运行,而不会将主程序置于等待状态。您也可以使用它来并行化 for 循环。当调用 for 循环时,尽管循环是顺序的,但一旦解释器到达主程序,每次迭代都会与主程序并行运行。

对于您的特定情况,您可以执行以下操作:

import asyncio
import time


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)                                   
    return wrapped


@background
def op(x):                                           # Do any operation you want
    time.sleep(1)
    print(f"function called for {x=}\n", end='')
    return x*x


loop = asyncio.get_event_loop()                      # Have a new event loop

looper = asyncio.gather(*[op(i) for i in range(20)]) # Run the loop; Doing for 20 for better demo
                             
results = loop.run_until_complete(looper)            # Wait until finish

print('List comprehension has finished and results are gathered!')      
print(results)

这会产生以下输出:

function called for x=5
function called for x=4
function called for x=2
function called for x=0
function called for x=6
function called for x=1
function called for x=7
function called for x=3
function called for x=8
function called for x=9
function called for x=10
function called for x=12
function called for x=11
function called for x=15
function called for x=13
function called for x=14
function called for x=16
function called for x=17
function called for x=18
function called for x=19
List comprehension has finished and results are gathered!
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]

请注意,所有函数调用都是并行的,因此会打乱打印顺序,但原始顺序会保留在结果列表中。

You can use asyncio. (Documentation can be found [here][1]). It is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. Plus it has both high-level and low-level APIs to accomodate any kind of problem.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Now this function will be run in parallel whenever called without putting main program into wait state. You can use it to parallelize for loop as well. When called for a for loop, though loop is sequential but every iteration runs in parallel to the main program as soon as interpreter gets there.

For your specific case, you can do:

import asyncio
import time


def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)                                   
    return wrapped


@background
def op(x):                                           # Do any operation you want
    time.sleep(1)
    print(f"function called for {x=}\n", end='')
    return x*x


loop = asyncio.get_event_loop()                      # Have a new event loop

looper = asyncio.gather(*[op(i) for i in range(20)]) # Run the loop; Doing for 20 for better demo
                             
results = loop.run_until_complete(looper)            # Wait until finish

print('List comprehension has finished and results are gathered!')      
print(results)

This produces following output:

function called for x=5
function called for x=4
function called for x=2
function called for x=0
function called for x=6
function called for x=1
function called for x=7
function called for x=3
function called for x=8
function called for x=9
function called for x=10
function called for x=12
function called for x=11
function called for x=15
function called for x=13
function called for x=14
function called for x=16
function called for x=17
function called for x=18
function called for x=19
List comprehension has finished and results are gathered!
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]

Note that all function calls were in parallel thus shuffled prints however original order is preserved in the resulting list.

糖粟与秋泊 2024-10-28 01:15:31

AFAIK 不在列表理解范围内。

您当然可以使用传统的 for 循环和多处理/线程模块来做到这一点。

Not within a list comprehension AFAIK.

You could certainly do it with a traditional for loop and the multiprocessing/threading modules.

记忆之渊 2024-10-28 01:15:31

这里有一个 Python 并行包的完整列表:

http://wiki.python.org/moin/ParallelProcessing

我不确定是否有直接处理列表理解构造的拆分,但以非列表理解方式制定相同的问题应该很简单,可以轻松地分叉到多个不同的处理器。我不熟悉云计算并行化,但我在多核机器和集群上使用 mpi4py 取得了一些成功。您必须考虑的最大问题是通信开销是否会消除您从并行化问题中获得的任何收益。

编辑:以下内容可能也令人感兴趣:

There is a comprehensive list of parallel packages for Python here:

http://wiki.python.org/moin/ParallelProcessing

I'm not sure if any handle the splitting of a list comprehension construct directly, but it should be trivial to formulate the same problem in a non-list comprehension way that can be easily forked to a number of different processors. I'm not familiar with cloud computing parallelization, but I've had some success with mpi4py on multi-core machines and over clusters. The biggest issue that you'll have to think about is whether the communication overhead is going to kill any gains you get from parallelizing the problem.

Edit: The following might also be of interest:

http://www.mblondel.org/journal/2009/11/27/easy-parallelization-with-data-decomposition/

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文