如何从线程中获取返回值?

发布于 2024-11-27 00:46:25 字数 418 浏览 3 评论 0 原文

下面的函数 foo 返回一个字符串 'foo'。如何获取从线程目标返回的值 'foo'

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'
    
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

上面所示的“一种明显的方法”不起作用:thread.join() 返回 None

The function foo below returns a string 'foo'. How can I get the value 'foo' which is returned from the thread's target?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'
    
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

The "one obvious way to do it", shown above, doesn't work: thread.join() returned None.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(30

愁以何悠 2024-12-04 00:46:25

我见过的一种方法是将可变对象(例如列表或字典)以及索引或其他某种标识符传递给线程的构造函数。然后,线程可以将其结果存储在该对象的专用槽中。例如:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

如果您确实希望 join() 返回被调用函数的返回值,您可以使用 Thread 子类来完成此操作,如下所示

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

:由于一些名称修改,它变得毛茸茸的,并且它访问特定于 Thread 实现的“私有”数据结构......但它有效。

对于Python 3:

class ThreadWithReturnValue(Thread):
    
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return

One way I've seen is to pass a mutable object, such as a list or a dictionary, to the thread's constructor, along with a an index or other identifier of some sort. The thread can then store its results in its dedicated slot in that object. For example:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

If you really want join() to return the return value of the called function, you can do this with a Thread subclass like the following:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

That gets a little hairy because of some name mangling, and it accesses "private" data structures that are specific to Thread implementation... but it works.

For Python 3:

class ThreadWithReturnValue(Thread):
    
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return
私野 2024-12-04 00:46:25

FWIW,multiprocessing 模块使用 Pool 类为此提供了一个很好的接口。如果您想坚持使用线程而不是进程,则可以使用 multiprocessing.pool.ThreadPool 类作为直接替代品。

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.

FWIW, the multiprocessing module has a nice interface for this using the Pool class. And if you want to stick with threads rather than processes, you can just use the multiprocessing.pool.ThreadPool class as a drop-in replacement.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.
你如我软肋 2024-12-04 00:46:25

在 Python 3.2+ 中,stdlib concurrent.futures 模块为线程提供了更高级别的 API,包括将返回值或异常从工作线程传递回主线程。您可以调用 result() 方法Future 实例上,它将等到线程完成后再返回线程函数的结果值。

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)

In Python 3.2+, stdlib concurrent.futures module provides a higher level API to threading, including passing return values or exceptions from a worker thread back to the main thread. You can call the result() method on a Future instance, and it will wait until the thread is completed before returning the result value of the thread's function.

import concurrent.futures

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(foo, 'world!')
    return_value = future.result()
    print(return_value)
梦醒时光 2024-12-04 00:46:25

Jake的答案很好,但是如果您不想使用线程池(您不知道需要多少个线程,但根据需要创建它们),那么在线程之间传输信息的一个好方法是内置Queue.Queue 类,因为它提供线程安全性。

我创建了以下装饰器,使其以与线程池类似的方式运行:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

然后您只需将其用作:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

装饰函数每次调用时都会创建一个新线程,并返回一个 Thread 对象,该对象包含将接收结果的队列。

更新

自从我发布这个答案以来已经有一段时间了,但它仍然得到了浏览,所以我想我会更新它以反映我在较新版本的Python中执行此操作的方式:

Python 3.2添加在< href="https://docs.python.org/3/library/concurrent.futures.html" rel="noreferrer">concurrent.futures 模块提供了一个高级并行电平接口任务。它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor,因此您可以使用具有相同 api 的线程或进程池。

此 api 的好处之一是向 Executor 提交任务会返回 Future 对象,它将以您提交的可调用对象的返回值完成。

这使得附加一个队列对象变得不必要,这大大简化了装饰器:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

如果没有传入,这将使用默认的模块线程池执行器。

用法非常相似之前:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

如果您使用的是 Python 3.4+,使用此方法(以及一般的 Future 对象)的一个非常好的功能是,返回的 future 可以被包装以将其转换为 asyncio.Futureasyncio.wrap_future。这使得它可以轻松地与协程一起工作:

result = await asyncio.wrap_future(long_task(10))

如果您不需要访问底层的concurrent.Future对象,您可以将包装包含在装饰器中:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

然后,每当您需要推动CPU密集型或阻塞时事件循环线程之外的代码,您可以将其放入装饰函数中:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()

Jake's answer is good, but if you don't want to use a threadpool (you don't know how many threads you'll need, but create them as needed) then a good way to transmit information between threads is the built-in Queue.Queue class, as it offers thread safety.

I created the following decorator to make it act in a similar fashion to the threadpool:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Then you just use it as:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

The decorated function creates a new thread each time it's called and returns a Thread object that contains the queue that will receive the result.

UPDATE

It's been quite a while since I posted this answer, but it still gets views so I thought I would update it to reflect the way I do this in newer versions of Python:

Python 3.2 added in the concurrent.futures module which provides a high-level interface for parallel tasks. It provides ThreadPoolExecutor and ProcessPoolExecutor, so you can use a thread or process pool with the same api.

One benefit of this api is that submitting a task to an Executor returns a Future object, which will complete with the return value of the callable you submit.

This makes attaching a queue object unnecessary, which simplifies the decorator quite a bit:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

This will use a default module threadpool executor if one is not passed in.

The usage is very similar to before:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

If you're using Python 3.4+, one really nice feature of using this method (and Future objects in general) is that the returned future can be wrapped to turn it into an asyncio.Future with asyncio.wrap_future. This makes it work easily with coroutines:

result = await asyncio.wrap_future(long_task(10))

If you don't need access to the underlying concurrent.Future object, you can include the wrap in the decorator:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

Then, whenever you need to push cpu intensive or blocking code off the event loop thread, you can put it in a decorated function:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
朕就是辣么酷 2024-12-04 00:46:25

另一种不需要更改现有代码的解决方案:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)     # Python 2.x
    #print('hello {0}'.format(bar))   # Python 3.x
    return 'foo'

que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result             # Python 2.x
#print(result)           # Python 3.x

它也可以轻松调整到多线程环境:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)     # Python 2.x
    #print('hello {0}'.format(bar))   # Python 3.x
    return 'foo'

que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x

threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result         # Python 2.x
    #print(result)       # Python 3.x

Another solution that doesn't require changing your existing code:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)     # Python 2.x
    #print('hello {0}'.format(bar))   # Python 3.x
    return 'foo'

que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result             # Python 2.x
#print(result)           # Python 3.x

It can be also easily adjusted to a multi-threaded environment:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)     # Python 2.x
    #print('hello {0}'.format(bar))   # Python 3.x
    return 'foo'

que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x

threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result         # Python 2.x
    #print(result)       # Python 3.x
你列表最软的妹 2024-12-04 00:46:25

更新:

我认为有一种更简单、更简洁的方法来保存线程的结果,并且可以使接口与threading.Thread类保持几乎相同(如果有边缘情况,请告诉我 - 我没有像下面的原始帖子那样进行测试):

import threading

class ConciseResult(threading.Thread):
    def run(self):
        self.result = self._target(*self._args, **self._kwargs)

为了稳健并避免潜在错误

import threading

class ConciseRobustResult(threading.Thread):
    def run(self):
        try:
            if self._target is not None:
                self.result = self._target(*self._args, **self._kwargs)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs

简短说明:我们覆盖threading.Threadrun方法,并且其他不做修改。这使我们能够使用 threading.Thread 类为我们所做的所有其他事情,而无需担心丢失潜在的边缘情况,例如 _private 属性赋值自定义属性修改 按照我原来的帖子的方式。

我们可以通过查看 help(ConciseResult)help(ConciseRobustResult) 的输出来验证我们是否只修改了 run 方法。此处定义的方法: 下包含的唯一方法/属性/描述符是 run,其他所有内容都来自继承的 threading.Thread 基类(请参阅从 threading.Thread 继承的方法: 部分)。

要使用下面的示例代码测试其中任一实现,请在 main 函数中用 ConciseResultConciseRobustResult 替换 ThreadWithResult以下。

init方法中使用闭包函数的原始帖子:

我发现的大多数答案都很长,需要熟悉其他模块或高级Python功能,并且会让某些人感到相当困惑,除非他们已经熟悉答案所讨论的所有内容。

简化方法的工作代码:

import threading

class ThreadWithResult(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
        def function():
            self.result = target(*args, **kwargs)
        super().__init__(group=group, target=function, name=name, daemon=daemon)

示例代码:

import time, random


def function_to_thread(n):
    count = 0
    while count < 3:
            print(f'still running thread {n}')
            count +=1
            time.sleep(3)
    result = random.random()
    print(f'Return value of thread {n} should be: {result}')
    return result


def main():
    thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
    thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print(thread1.result)
    print(thread2.result)

main()

说明:
我想大大简化事情,因此我创建了一个 ThreadWithResult 类并让它继承自 threading.Thread 。 __init__ 中的嵌套函数 function 调用我们要保存其值的线程函数,并将该嵌套函数的结果保存为实例属性 self.result 线程完成执行后。

创建它的实例与创建threading.Thread 的实例相同。将要在新线程上运行的函数传递给 target 参数,将函数可能需要的任何参数传递给 args 参数,并将任何关键字参数传递给 >kwargs 参数。

例如,

my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))

我认为这比绝大多数答案更容易理解,并且这种方法不需要额外的导入!我包含了 timerandom 模块来模拟线程的行为,但它们不需要实现 原始问题

我知道我在问题提出后很长时间才回答这个问题,但我希望这将来可以帮助更多的人!


编辑:我创建了save-thread-result PyPI 包,允许您访问上面相同的代码并在项目之间重用它(GitHub代码在这里)。 PyPI 包完全扩展了 threading.Thread 类,因此您可以在 ThreadWithResult 类的 threading.thread 上设置任何属性,如下所示出色地!

上面的原始答案概述了这个子类背后的主要思想,但有关更多信息,请参阅 更详细的解释(来自模块文档字符串)位于此处

快速使用示例:

pip3 install -U save-thread-result     # MacOS/Linux
pip  install -U save-thread-result     # Windows

python3     # MacOS/Linux
python      # Windows
from save_thread_result import ThreadWithResult

# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
    target = my_function,
    args   = (my_function_arg1, my_function_arg2, ...)
    kwargs = {my_function_kwarg1: kwarg1_value, my_function_kwarg2: kwarg2_value, ...}
)

thread.start()
thread.join()
if getattr(thread, 'result', None):
    print(thread.result)
else:
    # thread.result attribute not set - something caused
    # the thread to terminate BEFORE the thread finished
    # executing the function passed in through the
    # `target` argument
    print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')

# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)

UPDATE:

I think there's a significantly simpler and more concise way to save the result of the thread, and in a way that keeps the interface virtually identical to the threading.Thread class (please let me know if there are edge cases - I haven't tested as much as my original post below):

import threading

class ConciseResult(threading.Thread):
    def run(self):
        self.result = self._target(*self._args, **self._kwargs)

To be robust and avoid potential errors:

import threading

class ConciseRobustResult(threading.Thread):
    def run(self):
        try:
            if self._target is not None:
                self.result = self._target(*self._args, **self._kwargs)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs

Short explanation: we override only the run method of threading.Thread, and modify nothing else. This allows us to use everything else the threading.Thread class does for us, without needing to worry about missing potential edge cases such as _private attribute assignments or custom attribute modifications in the way that my original post does.

We can verify that we only modify the run method by looking at the output of help(ConciseResult) and help(ConciseRobustResult). The only method/attribute/descriptor included under Methods defined here: is run, and everything else comes from the inherited threading.Thread base class (see the Methods inherited from threading.Thread: section).

To test either of these implementations using the example code below, substitute ConciseResult or ConciseRobustResult for ThreadWithResult in the main function below.

Original post using a closure function in the init method:

Most answers I've found are long and require being familiar with other modules or advanced python features, and will be rather confusing to someone unless they're already familiar with everything the answer talks about.

Working code for a simplified approach:

import threading

class ThreadWithResult(threading.Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
        def function():
            self.result = target(*args, **kwargs)
        super().__init__(group=group, target=function, name=name, daemon=daemon)

Example code:

import time, random


def function_to_thread(n):
    count = 0
    while count < 3:
            print(f'still running thread {n}')
            count +=1
            time.sleep(3)
    result = random.random()
    print(f'Return value of thread {n} should be: {result}')
    return result


def main():
    thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
    thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    print(thread1.result)
    print(thread2.result)

main()

Explanation:
I wanted to simplify things significantly, so I created a ThreadWithResult class and had it inherit from threading.Thread. The nested function function in __init__ calls the threaded function we want to save the value of, and saves the result of that nested function as the instance attribute self.result after the thread finishes executing.

Creating an instance of this is identical to creating an instance of threading.Thread. Pass in the function you want to run on a new thread to the target argument and any arguments that your function might need to the args argument and any keyword arguments to the kwargs argument.

e.g.

my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))

I think this is significantly easier to understand than the vast majority of answers, and this approach requires no extra imports! I included the time and random module to simulate the behavior of a thread, but they're not required to achieve the functionality asked in the original question.

I know I'm answering this looong after the question was asked, but I hope this can help more people in the future!


EDIT: I created the save-thread-result PyPI package to allow you to access the same code above and reuse it across projects (GitHub code is here). The PyPI package fully extends the threading.Thread class, so you can set any attributes you would set on threading.thread on the ThreadWithResult class as well!

The original answer above goes over the main idea behind this subclass, but for more information, see the more detailed explanation (from the module docstring) here.

Quick usage example:

pip3 install -U save-thread-result     # MacOS/Linux
pip  install -U save-thread-result     # Windows

python3     # MacOS/Linux
python      # Windows
from save_thread_result import ThreadWithResult

# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
    target = my_function,
    args   = (my_function_arg1, my_function_arg2, ...)
    kwargs = {my_function_kwarg1: kwarg1_value, my_function_kwarg2: kwarg2_value, ...}
)

thread.start()
thread.join()
if getattr(thread, 'result', None):
    print(thread.result)
else:
    # thread.result attribute not set - something caused
    # the thread to terminate BEFORE the thread finished
    # executing the function passed in through the
    # `target` argument
    print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')

# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)
゛清羽墨安 2024-12-04 00:46:25

Parris / kindall 的 answer join/return 答案已移植到 Python 3 :

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

注意,Thread 类在 Python 3 中的实现方式有所不同。

Parris / kindall's answer join/return answer ported to Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Note, the Thread class is implemented differently in Python 3.

仅此而已 2024-12-04 00:46:25

我偷了kindall的答案并稍微清理了一下。

关键部分是向 join() 添加 *args 和 **kwargs 以处理超时

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)
        
        self._return = None
    
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
    
    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)
        
        return self._return

下面更新的答案

这是我最受欢迎的答案,所以我决定使用将运行的代码进行更新py2 和 py3。

此外,我看到这个问题的许多答案都表明缺乏对 Thread.join() 的理解。有些完全无法处理超时参数。但是,当您有 (1) 一个可以返回 None 的目标函数并且 (2) 您还传递了 timeout 时,您还应该注意有关实例的极端情况。代码> arg 到 join()。请参阅“测试 4”以了解这个极端情况。

与 py2 和 py3 一起使用的 ThreadWithReturn 类:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

_thread_target_key, _thread_args_key, _thread_kwargs_key = (
    ('_target', '_args', '_kwargs')
    if sys.version_info >= (3, 0) else
    ('_Thread__target', '_Thread__args', '_Thread__kwargs')
)

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None
    
    def run(self):
        target = getattr(self, _thread_target_key)
        if target is not None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )
    
    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

下面显示了一些示例测试:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

您能确定我们在 TEST 4 中可能遇到的极端情况吗?

问题是我们期望 GiveMe() 返回 None (参见测试 2),但我们也期望 join() 在超时时返回 None。

returned is None 意味着:

(1) 这就是 GiveMe() 返回的内容,或者

(2) join() 超时

这个例子很简单,因为我们知道 GiveMe() 将始终返回 None。但在现实世界的实例中(目标可能合法地返回 None 或其他内容),我们希望明确检查发生了什么。

以下是解决这个极端情况的方法:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()

I stole kindall's answer and cleaned it up just a little bit.

The key part is adding *args and **kwargs to join() in order to handle the timeout

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)
        
        self._return = None
    
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
    
    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)
        
        return self._return

UPDATED ANSWER BELOW

This is my most popularly upvoted answer, so I decided to update with code that will run on both py2 and py3.

Additionally, I see many answers to this question that show a lack of comprehension regarding Thread.join(). Some completely fail to handle the timeout arg. But there is also a corner-case that you should be aware of regarding instances when you have (1) a target function that can return None and (2) you also pass the timeout arg to join(). Please see "TEST 4" to understand this corner case.

ThreadWithReturn class that works with py2 and py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

_thread_target_key, _thread_args_key, _thread_kwargs_key = (
    ('_target', '_args', '_kwargs')
    if sys.version_info >= (3, 0) else
    ('_Thread__target', '_Thread__args', '_Thread__kwargs')
)

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None
    
    def run(self):
        target = getattr(self, _thread_target_key)
        if target is not None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )
    
    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Some sample tests are shown below:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Can you identify the corner-case that we may possibly encounter with TEST 4?

The problem is that we expect giveMe() to return None (see TEST 2), but we also expect join() to return None if it times out.

returned is None means either:

(1) that's what giveMe() returned, or

(2) join() timed out

This example is trivial since we know that giveMe() will always return None. But in real-world instance (where the target may legitimately return None or something else) we'd want to explicitly check for what happened.

Below is how to address this corner-case:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()
眉黛浅 2024-12-04 00:46:25

使用队列:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())

Using Queue :

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
又怨 2024-12-04 00:46:25

我发现执行此操作的最短、最简单的方法是利用 Python 类及其动态属性。您可以使用threading.current_thread()从生成的线程的上下文中检索当前线程,并将返回值分配给属性。

import threading

def some_target_function():
    # Your code here.
    threading.current_thread().return_value = "Some return value."

your_thread = threading.Thread(target=some_target_function)
your_thread.start()
your_thread.join()

return_value = your_thread.return_value
print(return_value)

The shortest and simplest way I've found to do this is to take advantage of Python classes and their dynamic properties. You can retrieve the current thread from within the context of your spawned thread using threading.current_thread(), and assign the return value to a property.

import threading

def some_target_function():
    # Your code here.
    threading.current_thread().return_value = "Some return value."

your_thread = threading.Thread(target=some_target_function)
your_thread.start()
your_thread.join()

return_value = your_thread.return_value
print(return_value)
_蜘蛛 2024-12-04 00:46:25

我解决这个问题的方法是将函数和线程包装在一个类中。不需要使用池、队列或 c 类型变量传递。它也是非阻塞的。您改为检查状态。请参阅代码末尾如何使用它的示例。

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()

My solution to the problem is to wrap the function and thread in a class. Does not require using pools,queues, or c type variable passing. It is also non blocking. You check status instead. See example of how to use it at end of code.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
雨的味道风的声音 2024-12-04 00:46:25

考虑到 @iman@JakeBiesinger 答案的评论,我已将其重新组合为具有不同数量的线程:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Taking into consideration @iman comment on @JakeBiesinger answer I have recomposed it to have various number of threads:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()
瞳孔里扚悲伤 2024-12-04 00:46:25

我正在使用这个包装器,它可以轻松地将任何函数转变为在线程中运行 - 处理其返回值或异常。它不会增加队列开销。

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

使用示例

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

关于 threading 模块的注释

舒适的返回值和返回值线程函数的异常处理是一种常见的“Pythonic”需求,并且确实应该由 threading 模块提供 - 可能直接在标准 Thread 类中提供。 ThreadPool 对于简单任务来说开销太大 - 3 个管理线程,很多官僚机构。不幸的是,Thread 的布局最初是从 Java 复制的 - 例如,您可以从仍然无用的第一个(!)构造函数参数 group 中看到这一点。

I'm using this wrapper, which comfortably turns any function for running in a Thread - taking care of its return value or exception. It doesn't add Queue overhead.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Usage Examples

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Notes on threading module

Comfortable return value & exception handling of a threaded function is a frequent "Pythonic" need and should indeed already be offered by the threading module - possibly directly in the standard Thread class. ThreadPool has way too much overhead for simple tasks - 3 managing threads, lots of bureaucracy. Unfortunately Thread's layout was copied from Java originally - which you see e.g. from the still useless 1st (!) constructor parameter group.

唠甜嗑 2024-12-04 00:46:25

根据所提到的内容,这是适用于 Python3 的更通用的解决方案。

import threading

class ThreadWithReturnValue(threading.Thread):
    def __init__(self, *init_args, **init_kwargs):
        threading.Thread.__init__(self, *init_args, **init_kwargs)
        self._return = None
    def run(self):
        self._return = self._target(*self._args, **self._kwargs)
    def join(self):
        threading.Thread.join(self)
        return self._return

用法

        th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
        th.start()
        response = th.join()
        response.status_code  # => 200

Based of what kindall mentioned, here's the more generic solution that works with Python3.

import threading

class ThreadWithReturnValue(threading.Thread):
    def __init__(self, *init_args, **init_kwargs):
        threading.Thread.__init__(self, *init_args, **init_kwargs)
        self._return = None
    def run(self):
        self._return = self._target(*self._args, **self._kwargs)
    def join(self):
        threading.Thread.join(self)
        return self._return

Usage

        th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
        th.start()
        response = th.join()
        response.status_code  # => 200
皓月长歌 2024-12-04 00:46:25

join 总是返回 None,我认为你应该子类 Thread 来处理返回码等。

join always return None, i think you should subclass Thread to handle return codes and so.

盛夏尉蓝 2024-12-04 00:46:25

您可以在线程函数的范围之上定义一个可变的,并将结果添加到其中。 (我还修改了代码以兼容 python3)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

这将返回 {'world!': 'foo'}

如果您使用函数输入作为结果字典的键,则保证每个唯一输入在结果中输入一个条目

You can define a mutable above the scope of the threaded function, and add the result to that. (I also modified the code to be python3 compatible)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

This returns {'world!': 'foo'}

If you use the function input as the key to your results dict, every unique input is guaranteed to give an entry in the results

喵星人汪星人 2024-12-04 00:46:25

您可以使用ThreadPool()pool.apply_async()来返回 来自 test() 如下所示:

from multiprocessing.pool import ThreadPool

def test(num1, num2):
    return num1 + num2

pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5

并且,您还可以使用 submit()concurrent.futures.ThreadPoolExecutor()test() 返回如下所示:

from concurrent.futures import ThreadPoolExecutor

def test(num1, num2):
    return num1 + num2

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5

,您可以使用数组 result 来代替 return,如下所示:

from threading import Thread

def test(num1, num2, r):
    r[0] = num1 + num2 # Instead of "return"

result = [None] # Here

thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5

并且 返回,您还可以使用队列结果,如下所示:

from threading import Thread
import queue

def test(num1, num2, q):
    q.put(num1 + num2) # Instead of "return" 

queue = queue.Queue() # Here

thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'

You can use pool.apply_async() of ThreadPool() to return the value from test() as shown below:

from multiprocessing.pool import ThreadPool

def test(num1, num2):
    return num1 + num2

pool = ThreadPool(processes=1) # Here
result = pool.apply_async(test, (2, 3)) # Here
print(result.get()) # 5

And, you can also use submit() of concurrent.futures.ThreadPoolExecutor() to return the value from test() as shown below:

from concurrent.futures import ThreadPoolExecutor

def test(num1, num2):
    return num1 + num2

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(test, 2, 3) # Here
print(future.result()) # 5

And, instead of return, you can use the array result as shown below:

from threading import Thread

def test(num1, num2, r):
    r[0] = num1 + num2 # Instead of "return"

result = [None] # Here

thread = Thread(target=test, args=(2, 3, result))
thread.start()
thread.join()
print(result[0]) # 5

And instead of return, you can also use the queue result as shown below:

from threading import Thread
import queue

def test(num1, num2, q):
    q.put(num1 + num2) # Instead of "return" 

queue = queue.Queue() # Here

thread = Thread(target=test, args=(2, 3, queue))
thread.start()
thread.join()
print(queue.get()) # '5'
我们的影子 2024-12-04 00:46:25

这是一个相当老的问题,但我想分享一个对我有用并帮助我的开发过程的简单解决方案。

这个答案背后的方法论是这样一个事实:“新”目标函数 inner 正在将原始函数的结果(通过 __init__ 函数传递)分配给 >result 包装器的实例属性通过称为闭包的方式实现。

这允许包装类保留返回值以供调用者随时访问。

注意:此方法不需要使用任何损坏的方法或 threading.Thread 类的私有方法,尽管尚未考虑yield 函数(OP 未提及yield 函数)。

享受!

from threading import Thread as _Thread


class ThreadWrapper:
    def __init__(self, target, *args, **kwargs):
        self.result = None
        self._target = self._build_threaded_fn(target)
        self.thread = _Thread(
            target=self._target,
            *args,
            **kwargs
        )

    def _build_threaded_fn(self, func):
        def inner(*args, **kwargs):
            self.result = func(*args, **kwargs)
        return inner

此外,您可以使用以下代码运行 pytest(假设您已安装)来演示结果:

import time
from commons import ThreadWrapper


def test():

    def target():
        time.sleep(1)
        return 'Hello'

    wrapper = ThreadWrapper(target=target)
    wrapper.thread.start()

    r = wrapper.result
    assert r is None

    time.sleep(2)

    r = wrapper.result
    assert r == 'Hello'

This is a pretty old question, but I wanted to share a simple solution that has worked for me and helped my dev process.

The methodology behind this answer is the fact that the "new" target function, inner is assigning the result of the original function (passed through the __init__ function) to the result instance attribute of the wrapper through something called closure.

This allows the wrapper class to hold onto the return value for callers to access at anytime.

NOTE: This method doesn't need to use any mangled methods or private methods of the threading.Thread class, although yield functions have not been considered (OP did not mention yield functions).

Enjoy!

from threading import Thread as _Thread


class ThreadWrapper:
    def __init__(self, target, *args, **kwargs):
        self.result = None
        self._target = self._build_threaded_fn(target)
        self.thread = _Thread(
            target=self._target,
            *args,
            **kwargs
        )

    def _build_threaded_fn(self, func):
        def inner(*args, **kwargs):
            self.result = func(*args, **kwargs)
        return inner

Additionally, you can run pytest (assuming you have it installed) with the following code to demonstrate the results:

import time
from commons import ThreadWrapper


def test():

    def target():
        time.sleep(1)
        return 'Hello'

    wrapper = ThreadWrapper(target=target)
    wrapper.thread.start()

    r = wrapper.result
    assert r is None

    time.sleep(2)

    r = wrapper.result
    assert r == 'Hello'
玩套路吗 2024-12-04 00:46:25

将您的目标定义为
1) 接受一个参数q
2) 将任何语句 return foo 替换为 q.put(foo); return

这样一个函数

def func(a):
    ans = a * a
    return ans

就会变成

def func(a, q):
    ans = a * a
    q.put(ans)
    return

,然后你就可以继续

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

这样你可以使用函数装饰器/包装器来制作它,这样你就可以使用现有的函数作为目标而不修改它们,但遵循这个基本方案。

Define your target to
1) take an argument q
2) replace any statements return foo with q.put(foo); return

so a function

def func(a):
    ans = a * a
    return ans

would become

def func(a, q):
    ans = a * a
    q.put(ans)
    return

and then you would proceed as such

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

And you can use function decorators/wrappers to make it so you can use your existing functions as target without modifying them, but follow this basic scheme.

美人迟暮 2024-12-04 00:46:25

GuySoft的想法很棒,但我认为该对象不一定必须从Thread继承,并且start()可以从接口中删除:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo

GuySoft's idea is great, but I think the object does not necessarily have to inherit from Thread and start() could be removed from interface:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo
追星践月 2024-12-04 00:46:25

如前所述,多处理池比基本线程慢得多。按照此处一些答案中的建议使用队列是一种非常有效的替代方案。我将它与字典一起使用,以便能够运行大量小线程并通过将它们与字典组合来恢复多个答案:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)

As mentioned multiprocessing pool is much slower than basic threading. Using queues as proposeded in some answers here is a very effective alternative. I have use it with dictionaries in order to be able run a lot of small threads and recuperate multiple answers by combining them with dictionaries:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)
聚集的泪 2024-12-04 00:46:25

这是我根据@Kindall 的回答创建的版本。

这个版本使得您所要做的就是输入带有参数的命令来创建新线程。

这是用 Python 3.8 制作的:

from threading import Thread
from typing import Any

def test(plug, plug2, plug3):
    print(f"hello {plug}")
    print(f'I am the second plug : {plug2}')
    print(plug3)
    return 'I am the return Value!'

def test2(msg):
    return f'I am from the second test: {msg}'

def test3():
    print('hello world')

def NewThread(com, Returning: bool, *arguments) -> Any:
    """
    Will create a new thread for a function/command.

    :param com: Command to be Executed
    :param arguments: Arguments to be sent to Command
    :param Returning: True/False Will this command need to return anything
    """
    class NewThreadWorker(Thread):
        def __init__(self, group = None, target = None, name = None, args = (), kwargs = None, *,
                     daemon = None):
            Thread.__init__(self, group, target, name, args, kwargs, daemon = daemon)
            
            self._return = None
        
        def run(self):
            if self._target is not None:
                self._return = self._target(*self._args, **self._kwargs)
        
        def join(self):
            Thread.join(self)
            return self._return
    
    ntw = NewThreadWorker(target = com, args = (*arguments,))
    ntw.start()
    if Returning:
        return ntw.join()

if __name__ == "__main__":
    print(NewThread(test, True, 'hi', 'test', test2('hi')))
    NewThread(test3, True)

Here is the version that I created of @Kindall's answer.

This version makes it so that all you have to do is input your command with arguments to create the new thread.

This was made with Python 3.8:

from threading import Thread
from typing import Any

def test(plug, plug2, plug3):
    print(f"hello {plug}")
    print(f'I am the second plug : {plug2}')
    print(plug3)
    return 'I am the return Value!'

def test2(msg):
    return f'I am from the second test: {msg}'

def test3():
    print('hello world')

def NewThread(com, Returning: bool, *arguments) -> Any:
    """
    Will create a new thread for a function/command.

    :param com: Command to be Executed
    :param arguments: Arguments to be sent to Command
    :param Returning: True/False Will this command need to return anything
    """
    class NewThreadWorker(Thread):
        def __init__(self, group = None, target = None, name = None, args = (), kwargs = None, *,
                     daemon = None):
            Thread.__init__(self, group, target, name, args, kwargs, daemon = daemon)
            
            self._return = None
        
        def run(self):
            if self._target is not None:
                self._return = self._target(*self._args, **self._kwargs)
        
        def join(self):
            Thread.join(self)
            return self._return
    
    ntw = NewThreadWorker(target = com, args = (*arguments,))
    ntw.start()
    if Returning:
        return ntw.join()

if __name__ == "__main__":
    print(NewThread(test, True, 'hi', 'test', test2('hi')))
    NewThread(test3, True)
轮廓§ 2024-12-04 00:46:25

感谢@alec-cureau。下面的代码对我来说工作正常。
替换已弃用的 currentThread():

import threading
import time

def helo(name):
    print("hello", name)
    time.sleep(5)
    threading.current_thread().return_value = name
    return True

t1 = threading.Thread(target=helo, args=('bhargav',))
t2 = threading.Thread(target=helo, args=('kushal',))

t1.start()
t2.start()

print(t1.join())
print(t2.join())

print( "returned", t1.return_value)
print("returned", t2.return_value)

Thanks to @alec-cureau. below code is working fine for me.
Replaced deprecated currentThread():

import threading
import time

def helo(name):
    print("hello", name)
    time.sleep(5)
    threading.current_thread().return_value = name
    return True

t1 = threading.Thread(target=helo, args=('bhargav',))
t2 = threading.Thread(target=helo, args=('kushal',))

t1.start()
t2.start()

print(t1.join())
print(t2.join())

print( "returned", t1.return_value)
print("returned", t2.return_value)
此刻的回忆 2024-12-04 00:46:25

一种常见的解决方案是用类似这样的装饰器包装函数 foo

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

那么整个代码可能看起来像这样

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

注意

一个重要的问题是返回值可能是unorderred
(事实上​​,返回值不一定保存到队列中,因为你可以选择任意线程安全数据结构)

One usual solution is to wrap your function foo with a decorator like

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Then the whole code may looks like that

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Note

One important issue is that the return values may be unorderred.
(In fact, the return value is not necessarily saved to the queue, since you can choose arbitrary thread-safe data structure )

冷夜 2024-12-04 00:46:25

Python3 中的 Kindall 的回答

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return

Kindall's answer in Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return
笑着哭最痛 2024-12-04 00:46:25

我不知道这是否对你们有用,但我选择创建
全局对象[主要是字典或嵌套数组],这样函数可以访问该对象并对其进行变异,我知道这需要更多资源,但我们不处理量子科学,所以,我想我们可以提供更多的内存假设 RAM 消耗随 CPU 使用率线性增加。
下面是一个示例代码:

import requests 
import json 
import string 
import random 
import threading
import time 
dictionary = {} 
def get_val1(L): 
    print('#1')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val2(L): 
    print('#2')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val3(L): 
    print('#3')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val4(L): 
    print('#4')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
t1 = threading.Thread(target=get_val1,args=(L[0],)) 
t2 = threading.Thread(target=get_val2,args=(L[1],)) 
t3 = threading.Thread(target=get_val3,args=(L[2],))
t4 = threading.Thread(target=get_val4,args=(L[3],))

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()

该程序运行 4 个线程,每个线程返回一些文本 L[i] for i in L 的数据,从 API 返回的数据存储在字典中,
无论是否有益,每个程序都可能有所不同,对于中小型繁重的计算任务,此对象突变工作得相当快,并且使用了更多资源的几个百分比。

I don't Know whether or not this worked for you guys but I choose to create
a global object [mostly dictionaries or nested arrays] that way a function can access the object and mutate it, I know it takes more resources but we are not dealing with quantum science,So , I guess we can give a little bit of more ram provided Ram Consumption increases linearly with CPU usage.
Here is an example code:

import requests 
import json 
import string 
import random 
import threading
import time 
dictionary = {} 
def get_val1(L): 
    print('#1')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val2(L): 
    print('#2')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val3(L): 
    print('#3')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
def get_val4(L): 
    print('#4')
    for n,elem in enumerate(L):
        dictionary[elem]=json.loads(requests.post(f'https://api.example.com?text={elem}&Return=JSON').text)
t1 = threading.Thread(target=get_val1,args=(L[0],)) 
t2 = threading.Thread(target=get_val2,args=(L[1],)) 
t3 = threading.Thread(target=get_val3,args=(L[2],))
t4 = threading.Thread(target=get_val4,args=(L[3],))

t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()

This program runs 4 threads each of which returns some data for some text L[i] for i in L, the returned data from API is stored in the dictionary,
It may vary from program to program whether it is beneficial or not, for small to medium heavy computation task this object mutation works pretty fast and uses a few percentages of more resources.

や莫失莫忘 2024-12-04 00:46:25

忘记所有那些复杂的解决方案。

只需安装合适的软件包,例如:
https://pypi.org/project/thread-with-results/

Forget all that complicated solutions.

Just install a suitable package such as:
https://pypi.org/project/thread-with-results/

凉城 2024-12-04 00:46:25

我知道这个线程很旧......但我遇到了同样的问题......如果你愿意使用 thread.join()

import threading

class test:

    def __init__(self):
        self.msg=""

    def hello(self,bar):
        print('hello {}'.format(bar))
        self.msg="foo"


    def main(self):
        thread = threading.Thread(target=self.hello, args=('world!',))
        thread.start()
        thread.join()
        print(self.msg)

g=test()
g.main()

I know this thread is old.... but I faced the same problem... If you are willing to use thread.join()

import threading

class test:

    def __init__(self):
        self.msg=""

    def hello(self,bar):
        print('hello {}'.format(bar))
        self.msg="foo"


    def main(self):
        thread = threading.Thread(target=self.hello, args=('world!',))
        thread.start()
        thread.join()
        print(self.msg)

g=test()
g.main()
天气好吗我好吗 2024-12-04 00:46:25

最好的方法...定义一个全局变量,然后在线程函数中更改该变量。没有什么可以传入或检索回来

from threading import Thread

# global var
random_global_var = 5

def function():
    global random_global_var
    random_global_var += 1

domath = Thread(target=function)
domath.start()
domath.join()
print(random_global_var)

# result: 6

Best way... Define a global variable, then change the variable in the threaded function. Nothing to pass in or retrieve back

from threading import Thread

# global var
random_global_var = 5

def function():
    global random_global_var
    random_global_var += 1

domath = Thread(target=function)
domath.start()
domath.join()
print(random_global_var)

# result: 6
早茶月光 2024-12-04 00:46:25
# python 3.x
from queue import Queue
from threading import Thread
import time
import numpy as np

def foo(bar):
 
    time.sleep(np.random.randint(1,4,1).item())
    # print('hello{0}'.format(bar))
    return bar

que=Queue()

threads_list=list()

for i in range(0, 50):
    threads_list.append(Thread(target= lambda q, arg1: q.put(foo(arg1)), args=(que, str(i))))
    threads_list[-1].start()

for t in threads_list:
    t.join()

results=[]
while not que.empty():
    result = que.get()
    results.append(result)
    print(result)

print(results)
print(len(results))

enter code here
# python 3.x
from queue import Queue
from threading import Thread
import time
import numpy as np

def foo(bar):
 
    time.sleep(np.random.randint(1,4,1).item())
    # print('hello{0}'.format(bar))
    return bar

que=Queue()

threads_list=list()

for i in range(0, 50):
    threads_list.append(Thread(target= lambda q, arg1: q.put(foo(arg1)), args=(que, str(i))))
    threads_list[-1].start()

for t in threads_list:
    t.join()

results=[]
while not que.empty():
    result = que.get()
    results.append(result)
    print(result)

print(results)
print(len(results))

enter code here
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文