Python threading.Thread 只能使用私有方法 self.__Thread_stop() 停止

发布于 2024-12-08 20:01:01 字数 2688 浏览 0 评论 0原文

我有一个函数,它接受大量 x,y 对作为输入,它使用 numpy 和 scipy 进行一些复杂的曲线拟合,然后返回一个值。为了尝试加快速度,我尝试使用 Queue.Queue 向两个线程提供数据。一旦数据完成。我试图让线程终止,然后结束调用进程并将控制权返回给 shell。

我试图理解为什么我必须诉诸 threading.Thread 中的私有方法来停止我的线程并将控制权返回给命令行。

self.join() 不会结束程序。夺回控制权的唯一方法是使用私有停止方法。

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            # self.join(timeout=None) does not work
            self._Thread__stop()

这是我的代码的近似值:

    class CalcThread(threading.Thread):
        def __init__(self,in_queue,out_queue,function):
            threading.Thread.__init__(self)
            self.in_queue = in_queue
            self.out_queue = out_queue
            self.function = function
            self.finished = threading.Event()

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            self._Thread__stop()

        def run(self):
            while not self.finished.isSet():
                params_for_function = self.in_queue.get()
                try:
                    tm = self.function(paramsforfunction)
                    self.in_queue.task_done()
                    self.out_queue.put(tm)
                except ValueError as v:
                    #modify params and reinsert into queue
                    window = params_for_function["window"]
                    params_for_function["window"] = window + 1
                    self.in_queue.put(params_for_function)

    def big_calculation(well_id,window,data_arrays):
            # do some analysis to calculate tm
            return tm

    if __name__ == "__main__":
        NUM_THREADS = 2
        workers = []
        in_queue = Queue()
        out_queue = Queue()

        for i in range(NUM_THREADS):
            w = CalcThread(in_queue,out_queue,big_calculation)
            w.start()
            workers.append(w)

        if options.analyze_all:
              for i in well_ids:
                  in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))

        in_queue.join()
        print "ALL THREADS SEEM TO BE DONE"
        # gather data and report it from out_queue
        for i in well_ids:
            p = out_queue.get()
            print p
            out_queue.task_done()
            # I had to do this to get the out_queue to proceed
            if out_queue.qsize() == 0:
                out_queue.join()
                break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method

        for aworker in workers:
            aworker.stop()

I have a function that accepts a large array of x,y pairs as an input which does some elaborate curve fitting using numpy and scipy and then returns a single value. To try and speed things up I am trying to have two threads that I feed the data to using Queue.Queue . Once the data is done. I am trying to have the threads terminate and then end the calling process and return control to the shell.

I am trying to understand why I have to resort to a private method in threading.Thread to stop my threads and return control to the commandline.

The self.join() does not end the program. The only way to get back control was to use the private stop method.

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            # self.join(timeout=None) does not work
            self._Thread__stop()

Here is an approximation of my code:

    class CalcThread(threading.Thread):
        def __init__(self,in_queue,out_queue,function):
            threading.Thread.__init__(self)
            self.in_queue = in_queue
            self.out_queue = out_queue
            self.function = function
            self.finished = threading.Event()

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            self._Thread__stop()

        def run(self):
            while not self.finished.isSet():
                params_for_function = self.in_queue.get()
                try:
                    tm = self.function(paramsforfunction)
                    self.in_queue.task_done()
                    self.out_queue.put(tm)
                except ValueError as v:
                    #modify params and reinsert into queue
                    window = params_for_function["window"]
                    params_for_function["window"] = window + 1
                    self.in_queue.put(params_for_function)

    def big_calculation(well_id,window,data_arrays):
            # do some analysis to calculate tm
            return tm

    if __name__ == "__main__":
        NUM_THREADS = 2
        workers = []
        in_queue = Queue()
        out_queue = Queue()

        for i in range(NUM_THREADS):
            w = CalcThread(in_queue,out_queue,big_calculation)
            w.start()
            workers.append(w)

        if options.analyze_all:
              for i in well_ids:
                  in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))

        in_queue.join()
        print "ALL THREADS SEEM TO BE DONE"
        # gather data and report it from out_queue
        for i in well_ids:
            p = out_queue.get()
            print p
            out_queue.task_done()
            # I had to do this to get the out_queue to proceed
            if out_queue.qsize() == 0:
                out_queue.join()
                break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method

        for aworker in workers:
            aworker.stop()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

情痴 2024-12-15 20:01:01

一般来说,终止修改共享资源的线程是一个坏主意。

在 Python 中,多线程中的 CPU 密集型任务比无用更糟糕,除非您在执行计算时释放 GIL。许多 numpy 函数都释放了 GIL。

文档中的 ThreadPoolExecutor 示例

import concurrent.futures # on Python 2.x: pip install futures 

calc_args = []
if options.analyze_all:
    calc_args.extend(dict(well_id=i,...) for i in well_ids)

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(big_calculation, args), args)
                           for args in calc_args)

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            if future.exception() is not None:
                print('%r generated an exception: %s' % (args,
                                                         future.exception()))
                if isinstance(future.exception(), ValueError):
                    #modify params and resubmit
                    args["window"] += 1
                    future_to_args[executor.submit(big_calculation, args)] = args

            else:
                print('f%r returned %r' % (args, future.result()))

print("ALL work SEEMs TO BE DONE")

您可以替换 如果没有共享状态,则由 ProcessPoolExecutor 执行 ThreadPoolExecutor。将代码放入 main() 函数中。

In general it is a bad idea to kill a thread that modifies shared resource.

CPU intensive tasks in multiple threads are worse than useless in Python unless you release GIL while performing computations. Many numpy functions do release GIL.

ThreadPoolExecutor example from the docs

import concurrent.futures # on Python 2.x: pip install futures 

calc_args = []
if options.analyze_all:
    calc_args.extend(dict(well_id=i,...) for i in well_ids)

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(big_calculation, args), args)
                           for args in calc_args)

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            if future.exception() is not None:
                print('%r generated an exception: %s' % (args,
                                                         future.exception()))
                if isinstance(future.exception(), ValueError):
                    #modify params and resubmit
                    args["window"] += 1
                    future_to_args[executor.submit(big_calculation, args)] = args

            else:
                print('f%r returned %r' % (args, future.result()))

print("ALL work SEEMs TO BE DONE")

You could replace ThreadPoolExecutor by ProcessPoolExecutor if there is no shared state. Put the code in your main() function.

遗心遗梦遗幸福 2024-12-15 20:01:01

为了详细说明我的评论 - 如果您的线程的唯一目的是使用队列中的值并对它们执行函数,那么您显然最好做这样的事情恕我直言:

q = Queue()
results = []

def worker():
  while True:
    x, y = q.get()
    results.append(x ** y)
    q.task_done()

for _ in range(workerCount):
  t = Thread(target = worker)
  t.daemon = True
  t.start()

for tup in listOfXYs:
  q.put(tup)

q.join()

# Some more code here with the results list.

q.join() 将阻塞直到再次为空。工作线程将继续尝试检索值,但找不到任何值,因此一旦队列为空,它们将无限期等待。当您的脚本稍后完成执行时,工作线程将终止,因为它们被标记为守护线程。

To elaborate on my comment - if the sole purpose of your threads is to consume values from a Queue and perform a function on them you're decidedly better off to do something like this IMHO:

q = Queue()
results = []

def worker():
  while True:
    x, y = q.get()
    results.append(x ** y)
    q.task_done()

for _ in range(workerCount):
  t = Thread(target = worker)
  t.daemon = True
  t.start()

for tup in listOfXYs:
  q.put(tup)

q.join()

# Some more code here with the results list.

q.join() will block until it is empty again. The worker threads will continue to attempt to retrieve values, but won't find any, so they'll wait indefinitely once the queue is empty. When your script finishes its execution later the worker threads will die because they're marked as daemon threads.

唠甜嗑 2024-12-15 20:01:01

我尝试了 gddc 的方法,它产生了一个有趣的结果。我可以让他精确的 x**y 计算在线程之间很好地传播。

当我在 Worker while True 循环中调用我的函数时。仅当我将 time.sleep(1) 放入调用线程 start() 方法的 for 循环中时,我才能在多个线程之间执行计算。

所以在我的代码中。如果没有 time.sleep(1) ,程序要么干净退出,没有输出,要么在某些情况下

“线程 Thread-2 中的异常(最有可能在解释器关闭期间引发):线程 Thread-1 中的异常(最有可能在解释器关闭期间引发)”解释器关闭):“

一旦我添加了 time.sleep() ,一切都运行良好。

for aworker in range(5):
    t = Thread(target = worker)
    t.daemon = True
    t.start()
    # This sleep was essential or results for my specific function were None
    time.sleep(1)
    print "Started"

I tried g.d.d.c's method and it produced an interesting result. I could get his exact x**y calculation to work just fine spread between the threads .

When I called my function inside the worker while True loop. I could perform the calculations among multiple threads only if I put a time.sleep(1) in the for loop that calls the threads start() method.

So In my code. Without the time.sleep(1) the program gave me either a clean exit with no output or in some cases

"Exception in thread Thread-2 (most likely raised during interpreter shutdown):Exception in thread Thread-1 (most likely raised during interpreter shutdown):"

Once I added the time.sleep() everything ran fine.

for aworker in range(5):
    t = Thread(target = worker)
    t.daemon = True
    t.start()
    # This sleep was essential or results for my specific function were None
    time.sleep(1)
    print "Started"
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文