将多个参数传递给concurrent.futures.Executor.map?

发布于 2024-11-25 17:10:21 字数 561 浏览 2 评论 0 原文

concurrent.futures.Executor.map 接受可变数量的可迭代对象,从中调用给定的函数。 如果我有一个生成器,可以生成通常在适当位置解包的元组,我应该如何调用它?

以下内容不起作用,因为每个生成的元组都作为映射的不同参数给出

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):
    pass

:生成器,映射所需的参数可能如下所示:

executor.map(
    f,
    (i[0] for i in args),
    (i[1] for i in args),
    ...,
    (i[N] for i in args),
)

The concurrent.futures.Executor.map takes a variable number of iterables from which the function given is called. How should I call it if I have a generator that produces tuples that are normally unpacked in place?

The following doesn't work because each of the generated tuples is given as a different argument to map:

args = ((a, b) for (a, b) in c)
for result in executor.map(f, *args):
    pass

Without the generator, the desired arguments to map might look like this:

executor.map(
    f,
    (i[0] for i in args),
    (i[1] for i in args),
    ...,
    (i[N] for i in args),
)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

葮薆情 2024-12-02 17:10:21

一个重复的参数,c 中的一个参数

from itertools import repeat
for result in executor.map(f, repeat(a), c):
    pass

需要解包 c 的项,并且可以解包 c

from itertools import izip
for result in executor.map(f, *izip(*c)):
    pass

需要解包 的项c,无法解包 c

  1. 更改 f 以采用单个参数并解包函数中的参数。
  2. 如果 c 中的每个项目都有可变数量的成员,或者您仅调用 f 几次:

    executor.map(lambda args, f=f: f(*args), c)
    

    它定义了一个新函数,用于从 c 中解压每个项目并调用 f。在 lambda 中使用 f 的默认参数使 flambda 内部成为本地变量,从而减少查找时间。

  3. 如果您有固定数量的参数,并且需要多次调用 f

    从集合导入双端队列
    def itemtee(可迭代,n=2):
        def gen(it = iter(iterable), items = deque(), next = next):
            popleft = 项目.popleft
            扩展 = items.extend
            而真实:
                如果不是物品:
                    扩展(下一个(它))
                产生 popleft()
        返回 [gen()] * n
    
    执行器.map(f, *itemtee(c, n))
    

其中 n 是参数的数量f 的参数。这是改编自 itertools.tee

One argument that is repeated, one argument in c

from itertools import repeat
for result in executor.map(f, repeat(a), c):
    pass

Need to unpack items of c, and can unpack c

from itertools import izip
for result in executor.map(f, *izip(*c)):
    pass

Need to unpack items of c, can't unpack c

  1. Change f to take a single argument and unpack the argument in the function.
  2. If each item in c has a variable number of members, or you're calling f only a few times:

    executor.map(lambda args, f=f: f(*args), c)
    

    It defines a new function that unpacks each item from c and calls f. Using a default argument for f in the lambda makes f local inside the lambda and so reduces lookup time.

  3. If you've got a fixed number of arguments, and you need to call f a lot of times:

    from collections import deque
    def itemtee(iterable, n=2):
        def gen(it = iter(iterable), items = deque(), next = next):
            popleft = items.popleft
            extend = items.extend
            while True:
                if not items:
                    extend(next(it))
                yield popleft()
        return [gen()] * n
    
    executor.map(f, *itemtee(c, n))
    

Where n is the number of arguments to f. This is adapted from itertools.tee.

聽兲甴掵 2024-12-02 17:10:21

您需要删除 map 调用上的 *

args = ((a, b) for b in c)
for result in executor.map(f, args):
    pass

这将调用 flen(args) 次,其中 f 应该接受一个参数。

如果您希望 f 接受两个参数,您可以使用 lambda 调用,例如:

args = ((a, b) for b in c)
for result in executor.map(lambda p: f(*p), args):   # (*p) does the unpacking part
    pass

You need to remove the * on the map call:

args = ((a, b) for b in c)
for result in executor.map(f, args):
    pass

This will call f, len(args) times, where f should accept one parameter.

If you want f to accept two parameters you can use a lambda call like:

args = ((a, b) for b in c)
for result in executor.map(lambda p: f(*p), args):   # (*p) does the unpacking part
    pass
旧人哭 2024-12-02 17:10:21

因此,假设您有一个接受 3 个参数 的函数,并且所有 3 个参数都是动态,并且随着每次调用而不断变化。例如:

def multiply(a,b,c):
    print(a * b * c)

要使用线程多次调用此函数,我首先创建一个元组列表,其中每个元组都是 a、b、c 的一个版本:

arguments = [(1,2,3), (4,5,6), (7,8,9), ....]

我们知道 concurrent.futures< /code> 的 map 函数将接受第一个参数作为目标函数,第二个参数作为函数每个版本的参数列表将被执行。因此,您可能会进行如下调用:

for _ in executor.map(multiply, arguments) # Error

但这会给您带来错误,即该函数需要 3 个参数,但只得到 1。为了解决这个问题,我们创建一个辅助函数:

def helper(numbers):
    multiply(numbers[0], numbers[1], numbers[2])

现在,我们可以使用执行器调用这个函数,如下所示:

with ThreadPoolExecutor() as executor:
     for _ in executor.map(helper, arguments):
         pass

这应该会给你想要的结果。

So suppose you have a function which takes 3 arguments and all the 3 arguments are dynamic and keep on changing with every call. For example:

def multiply(a,b,c):
    print(a * b * c)

To call this multiple times using threading, I would first create a list of tuples where each tuple is a version of a,b,c:

arguments = [(1,2,3), (4,5,6), (7,8,9), ....]

To we know that concurrent.futures's map function would accept first argument as the target function and second argument as the list of arguments for each version of the function that will be execute. Therefore, you might make a call like this:

for _ in executor.map(multiply, arguments) # Error

But this will give you error that the function expected 3 arguments but got only 1. To solve this problem, we create a helper function:

def helper(numbers):
    multiply(numbers[0], numbers[1], numbers[2])

Now, we can call this function using executor as follow:

with ThreadPoolExecutor() as executor:
     for _ in executor.map(helper, arguments):
         pass

That should give you the desired results.

苏璃陌 2024-12-02 17:10:21

您可以通过 Python 中的 partial 方法使用柯里化创建新函数

from concurrent.futures import ThreadPoolExecutor
from functools import partial


def some_func(param1, param2):
    # some code

# currying some_func with 'a' argument is repeated
func = partial(some_func, a)
with ThreadPoolExecutor() as executor:
    executor.map(func, list_of_args):
    ...

如果您需要传递多个相同的参数,可以将它们传递到 部分方法

func = partial(some_func, a, b, c)

You can use currying to create new function via partial method in Python

from concurrent.futures import ThreadPoolExecutor
from functools import partial


def some_func(param1, param2):
    # some code

# currying some_func with 'a' argument is repeated
func = partial(some_func, a)
with ThreadPoolExecutor() as executor:
    executor.map(func, list_of_args):
    ...

If you need to pass more than one the same parameters you can pass them to partial method

func = partial(some_func, a, b, c)
暮凉 2024-12-02 17:10:21

下面的代码片段展示了如何使用 ThreadPoolExecutor 向函数发送多个参数:

import concurrent.futures


def hello(first_name: str, last_name: str) -> None:
    """Prints a friendly hello with first name and last name"""
    print('Hello %s %s!' % (first_name, last_name))


def main() -> None:
    """Examples showing how to use ThreadPoolExecutor and executer.map
    sending multiple arguments to a function"""

    # Example 1: Sending multiple arguments using tuples
    # Define tuples with sequential arguments to be passed to hello()
    args_names = (
        ('Bruce', 'Wayne'),
        ('Clark', 'Kent'),
        ('Diana', 'Prince'),
        ('Barry', 'Allen'),
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the tuple (*f) into hello(*args)
        executor.map(lambda f: hello(*f), args_names)

    print()

    # Example 2: Sending multiple arguments using dict with named keys
    # Define dicts with arguments as key names to be passed to hello()
    kwargs_names = (
        {'first_name': 'Bruce', 'last_name': 'Wayne'},
        {'first_name': 'Clark', 'last_name': 'Kent'},
        {'first_name': 'Diana', 'last_name': 'Prince'},
        {'first_name': 'Barry', 'last_name': 'Allen'},
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the dict (**f) into hello(**kwargs)
        executor.map(lambda f: hello(**f), kwargs_names)


if __name__ == '__main__':
    main()

Here's a code snippet showing how to send multiple arguments to a function with ThreadPoolExecutor:

import concurrent.futures


def hello(first_name: str, last_name: str) -> None:
    """Prints a friendly hello with first name and last name"""
    print('Hello %s %s!' % (first_name, last_name))


def main() -> None:
    """Examples showing how to use ThreadPoolExecutor and executer.map
    sending multiple arguments to a function"""

    # Example 1: Sending multiple arguments using tuples
    # Define tuples with sequential arguments to be passed to hello()
    args_names = (
        ('Bruce', 'Wayne'),
        ('Clark', 'Kent'),
        ('Diana', 'Prince'),
        ('Barry', 'Allen'),
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the tuple (*f) into hello(*args)
        executor.map(lambda f: hello(*f), args_names)

    print()

    # Example 2: Sending multiple arguments using dict with named keys
    # Define dicts with arguments as key names to be passed to hello()
    kwargs_names = (
        {'first_name': 'Bruce', 'last_name': 'Wayne'},
        {'first_name': 'Clark', 'last_name': 'Kent'},
        {'first_name': 'Diana', 'last_name': 'Prince'},
        {'first_name': 'Barry', 'last_name': 'Allen'},
    )
    with concurrent.futures.ThreadPoolExecutor() as executor:
        # Using lambda, unpacks the dict (**f) into hello(**kwargs)
        executor.map(lambda f: hello(**f), kwargs_names)


if __name__ == '__main__':
    main()
飞烟轻若梦 2024-12-02 17:10:21

对于 ProcessPoolExecutor.map()< /代码>

与map(func, *iterables)类似,除了:

可迭代对象是立即收集的,而不是延迟收集的;

func 是异步执行的,并且可能会多次调用 func
同时。

因此,ProcessPoolExecutor.map()的用法与Python内置map()的用法相同。这是文档:

返回一个迭代器,该迭代器将函数应用于可迭代的每个项目,
产生结果。 如果传递额外的可迭代参数,
函数必须接受那么多参数并应用于项目
来自所有并行的可迭代。

结论:将几个参数传递给 map()

尝试在 python 3 下运行以下代码片段,你就会很清楚了:

from concurrent.futures import ProcessPoolExecutor

def f(a, b):
    print(a+b)

with ProcessPoolExecutor() as pool:
    pool.map(f, (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2))

# 0, 2, 4

array = [(i, i) for i in range(3)]
with ProcessPoolExecutor() as pool:
    pool.map(f, *zip(*array))

# 0, 2, 4

For ProcessPoolExecutor.map():

Similar to map(func, *iterables) except:

the iterables are collected immediately rather than lazily;

func is executed asynchronously and several calls to func may be made
concurrently.

Therefore, the usage of ProcessPoolExecutor.map() is the same as that of Python's build-in map(). Here is the docs:

Return an iterator that applies function to every item of iterable,
yielding the results. If additional iterable arguments are passed,
function must take that many arguments and is applied to the items
from all iterables in parallel.

Conclusion: pass the several parameters to map().

Try running the following snippet under python 3, and you will be quite clear:

from concurrent.futures import ProcessPoolExecutor

def f(a, b):
    print(a+b)

with ProcessPoolExecutor() as pool:
    pool.map(f, (0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (0, 1, 2))

# 0, 2, 4

array = [(i, i) for i in range(3)]
with ProcessPoolExecutor() as pool:
    pool.map(f, *zip(*array))

# 0, 2, 4
小糖芽 2024-12-02 17:10:21

我在这里看到了很多答案,但没有一个像使用 lambda 表达式那样直接:

foo(x,y):

想要使用相同的值(即 xVal 和 yVal)调用上述方法 10 次 ?
以 concurrent.futures.ThreadPoolExecutor() 作为执行器:

for _ in executor.map( lambda _: foo(xVal, yVal), range(0, 10)):
    pass

I have seen so many answers here, but none of them is as straight forward as using lambda expressions:

foo(x,y):
pass

want to call above method 10 times, with same value i.e. xVal and yVal?
with concurrent.futures.ThreadPoolExecutor() as executor:

for _ in executor.map( lambda _: foo(xVal, yVal), range(0, 10)):
    pass
丿*梦醉红颜 2024-12-02 17:10:21

假设您在下面显示的数据框中有这样的数据,并且您希望将第一两列传递给一个函数,该函数将读取图像并预测特征,然后计算差异并返回差异值。

注意:您可以根据您的要求有任何场景,并且可以分别定义函数。

下面的代码片段将这两列作为参数并传递给线程池机制(还显示进度条)

在此处输入图像描述

''' function that will give the difference of two numpy feature matrix'''
def getDifference(image_1_loc, image_2_loc, esp=1e-7):
       arr1 = ''' read 1st image and extract feature '''
       arr2 = ''' read 2nd image and extract feature '''
       diff = arr1.ravel() - arr2.ravel() + esp    
       return diff

'''Using ThreadPoolExecutor from concurrent.futures with multiple argument'''

with ThreadPoolExecutor() as executor:
        result = np.array(
                         list(tqdm(
                                   executor.map(lambda x : function(*x), [(i,j) for i,j in df[['image_1','image_2']].values]),
                               total=len(df)
                                  ) 
                             )
                          )

在此处输入图像描述

lets say you have data like this in data frame shown below and you want to pass 1st two columns to a function which will read the images and predict the fetaures and then calculate the difference and return the difference value.

Note: you can have any scenario as per your requirement and respectively you can define the function.

The below code snippet will takes these two columns as argument and pass to the Threadpool mechanism (showing the progress bar also)

enter image description here

''' function that will give the difference of two numpy feature matrix'''
def getDifference(image_1_loc, image_2_loc, esp=1e-7):
       arr1 = ''' read 1st image and extract feature '''
       arr2 = ''' read 2nd image and extract feature '''
       diff = arr1.ravel() - arr2.ravel() + esp    
       return diff

'''Using ThreadPoolExecutor from concurrent.futures with multiple argument'''

with ThreadPoolExecutor() as executor:
        result = np.array(
                         list(tqdm(
                                   executor.map(lambda x : function(*x), [(i,j) for i,j in df[['image_1','image_2']].values]),
                               total=len(df)
                                  ) 
                             )
                          )

enter image description here

小苏打饼 2024-12-02 17:10:21

下面是我一直使用的一个简单实用程序。

########### Start of Utility Code ###########

import os
import sys
import traceback

from concurrent import futures
from functools import partial


def catch(fn):
    def wrap(*args, **kwargs):
        result = None
        try:
            result = fn(*args, **kwargs)
        except Exception as err:
            type_, value_, traceback_ = sys.exc_info()
            return None, (
                args,
                "".join(traceback.format_exception(type_, value_, traceback_)),
            )
        else:
            return result, (args, None)

    return wrap


def top_level_wrap(fn, arg_tuple):
    args, kwargs = arg_tuple
    return fn(*args, *kwargs)


def create_processes(fn, values, handle_error, handle_success):
    cores = os.cpu_count()
    max_workers = 2 * cores + 1

    to_exec = partial(top_level_wrap, fn)

    with futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        for result, error in executor.map(to_exec, values):
            args, tb = error
            if tb is not None:
                handle_error(args, tb)
            else:
                handle_success(result)


########### End of Utility Code ###########

示例用法 -

######### Start of example usage ###########

import time


@catch
def fail_when_5(val):
    time.sleep(val)
    if val == 5:
        raise Exception("Error - val was 5")
    else:
        return f"No error val is {val}"


def handle_error(args, tb):
    print("args is", args)
    print("TB is", tb)


def top_level(val, val_2, test=None, test2="ok"):
    print(val_2, test, test2)
    return fail_when_5(val)

handle_success = print


if __name__ == "__main__":
    # SHAPE -> ( (args, kwargs), (args, kwargs), ... )
    values = tuple(
        ((x, x + 1), {"test": f"t_{x+2}", "test2": f"t_{x+3}"}) for x in range(10)
    )
    create_processes(top_level, values, handle_error, handle_success)

######### End of example usage ###########

A simple utility that I use all the time is below.

########### Start of Utility Code ###########

import os
import sys
import traceback

from concurrent import futures
from functools import partial


def catch(fn):
    def wrap(*args, **kwargs):
        result = None
        try:
            result = fn(*args, **kwargs)
        except Exception as err:
            type_, value_, traceback_ = sys.exc_info()
            return None, (
                args,
                "".join(traceback.format_exception(type_, value_, traceback_)),
            )
        else:
            return result, (args, None)

    return wrap


def top_level_wrap(fn, arg_tuple):
    args, kwargs = arg_tuple
    return fn(*args, *kwargs)


def create_processes(fn, values, handle_error, handle_success):
    cores = os.cpu_count()
    max_workers = 2 * cores + 1

    to_exec = partial(top_level_wrap, fn)

    with futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        for result, error in executor.map(to_exec, values):
            args, tb = error
            if tb is not None:
                handle_error(args, tb)
            else:
                handle_success(result)


########### End of Utility Code ###########

Example usage -

######### Start of example usage ###########

import time


@catch
def fail_when_5(val):
    time.sleep(val)
    if val == 5:
        raise Exception("Error - val was 5")
    else:
        return f"No error val is {val}"


def handle_error(args, tb):
    print("args is", args)
    print("TB is", tb)


def top_level(val, val_2, test=None, test2="ok"):
    print(val_2, test, test2)
    return fail_when_5(val)

handle_success = print


if __name__ == "__main__":
    # SHAPE -> ( (args, kwargs), (args, kwargs), ... )
    values = tuple(
        ((x, x + 1), {"test": f"t_{x+2}", "test2": f"t_{x+3}"}) for x in range(10)
    )
    create_processes(top_level, values, handle_error, handle_success)

######### End of example usage ###########
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文