Python threading.Thread 只能使用私有方法 self.__Thread_stop() 停止
我有一个函数,它接受大量 x,y 对作为输入,它使用 numpy 和 scipy 进行一些复杂的曲线拟合,然后返回一个值。为了尝试加快速度,我尝试使用 Queue.Queue 向两个线程提供数据。一旦数据完成。我试图让线程终止,然后结束调用进程并将控制权返回给 shell。
我试图理解为什么我必须诉诸 threading.Thread 中的私有方法来停止我的线程并将控制权返回给命令行。
self.join() 不会结束程序。夺回控制权的唯一方法是使用私有停止方法。
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
# self.join(timeout=None) does not work
self._Thread__stop()
这是我的代码的近似值:
class CalcThread(threading.Thread):
def __init__(self,in_queue,out_queue,function):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
self.function = function
self.finished = threading.Event()
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
self._Thread__stop()
def run(self):
while not self.finished.isSet():
params_for_function = self.in_queue.get()
try:
tm = self.function(paramsforfunction)
self.in_queue.task_done()
self.out_queue.put(tm)
except ValueError as v:
#modify params and reinsert into queue
window = params_for_function["window"]
params_for_function["window"] = window + 1
self.in_queue.put(params_for_function)
def big_calculation(well_id,window,data_arrays):
# do some analysis to calculate tm
return tm
if __name__ == "__main__":
NUM_THREADS = 2
workers = []
in_queue = Queue()
out_queue = Queue()
for i in range(NUM_THREADS):
w = CalcThread(in_queue,out_queue,big_calculation)
w.start()
workers.append(w)
if options.analyze_all:
for i in well_ids:
in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))
in_queue.join()
print "ALL THREADS SEEM TO BE DONE"
# gather data and report it from out_queue
for i in well_ids:
p = out_queue.get()
print p
out_queue.task_done()
# I had to do this to get the out_queue to proceed
if out_queue.qsize() == 0:
out_queue.join()
break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method
for aworker in workers:
aworker.stop()
I have a function that accepts a large array of x,y pairs as an input which does some elaborate curve fitting using numpy and scipy and then returns a single value. To try and speed things up I am trying to have two threads that I feed the data to using Queue.Queue . Once the data is done. I am trying to have the threads terminate and then end the calling process and return control to the shell.
I am trying to understand why I have to resort to a private method in threading.Thread to stop my threads and return control to the commandline.
The self.join() does not end the program. The only way to get back control was to use the private stop method.
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
# self.join(timeout=None) does not work
self._Thread__stop()
Here is an approximation of my code:
class CalcThread(threading.Thread):
def __init__(self,in_queue,out_queue,function):
threading.Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
self.function = function
self.finished = threading.Event()
def stop(self):
print "STOP CALLED"
self.finished.set()
print "SET DONE"
self._Thread__stop()
def run(self):
while not self.finished.isSet():
params_for_function = self.in_queue.get()
try:
tm = self.function(paramsforfunction)
self.in_queue.task_done()
self.out_queue.put(tm)
except ValueError as v:
#modify params and reinsert into queue
window = params_for_function["window"]
params_for_function["window"] = window + 1
self.in_queue.put(params_for_function)
def big_calculation(well_id,window,data_arrays):
# do some analysis to calculate tm
return tm
if __name__ == "__main__":
NUM_THREADS = 2
workers = []
in_queue = Queue()
out_queue = Queue()
for i in range(NUM_THREADS):
w = CalcThread(in_queue,out_queue,big_calculation)
w.start()
workers.append(w)
if options.analyze_all:
for i in well_ids:
in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))
in_queue.join()
print "ALL THREADS SEEM TO BE DONE"
# gather data and report it from out_queue
for i in well_ids:
p = out_queue.get()
print p
out_queue.task_done()
# I had to do this to get the out_queue to proceed
if out_queue.qsize() == 0:
out_queue.join()
break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method
for aworker in workers:
aworker.stop()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
一般来说,终止修改共享资源的线程是一个坏主意。
在 Python 中,多线程中的 CPU 密集型任务比无用更糟糕,除非您在执行计算时释放 GIL。许多 numpy 函数都释放了 GIL。
文档中的 ThreadPoolExecutor 示例
您可以替换
如果没有共享状态,则由
。将代码放入ProcessPoolExecutor
执行 ThreadPoolExecutormain()
函数中。In general it is a bad idea to kill a thread that modifies shared resource.
CPU intensive tasks in multiple threads are worse than useless in Python unless you release GIL while performing computations. Many
numpy
functions do release GIL.ThreadPoolExecutor example from the docs
You could replace
ThreadPoolExecutor
byProcessPoolExecutor
if there is no shared state. Put the code in yourmain()
function.为了详细说明我的评论 - 如果您的线程的唯一目的是使用队列中的值并对它们执行函数,那么您显然最好做这样的事情恕我直言:
q.join()
将阻塞直到再次为空。工作线程将继续尝试检索值,但找不到任何值,因此一旦队列为空,它们将无限期等待。当您的脚本稍后完成执行时,工作线程将终止,因为它们被标记为守护线程。To elaborate on my comment - if the sole purpose of your threads is to consume values from a Queue and perform a function on them you're decidedly better off to do something like this IMHO:
q.join()
will block until it is empty again. The worker threads will continue to attempt to retrieve values, but won't find any, so they'll wait indefinitely once the queue is empty. When your script finishes its execution later the worker threads will die because they're marked as daemon threads.我尝试了 gddc 的方法,它产生了一个有趣的结果。我可以让他精确的 x**y 计算在线程之间很好地传播。
当我在 Worker while True 循环中调用我的函数时。仅当我将 time.sleep(1) 放入调用线程 start() 方法的 for 循环中时,我才能在多个线程之间执行计算。
所以在我的代码中。如果没有 time.sleep(1) ,程序要么干净退出,没有输出,要么在某些情况下
“线程 Thread-2 中的异常(最有可能在解释器关闭期间引发):线程 Thread-1 中的异常(最有可能在解释器关闭期间引发)”解释器关闭):“
一旦我添加了 time.sleep() ,一切都运行良好。
I tried g.d.d.c's method and it produced an interesting result. I could get his exact x**y calculation to work just fine spread between the threads .
When I called my function inside the worker while True loop. I could perform the calculations among multiple threads only if I put a time.sleep(1) in the for loop that calls the threads start() method.
So In my code. Without the time.sleep(1) the program gave me either a clean exit with no output or in some cases
"Exception in thread Thread-2 (most likely raised during interpreter shutdown):Exception in thread Thread-1 (most likely raised during interpreter shutdown):"
Once I added the time.sleep() everything ran fine.