如何在 Python 线程池中使用初始化器
我正在尝试使用 PyFFTW 进行线程卷积,以便计算大量 同时进行 2D 卷积。 (不需要单独的进程,因为 GIL 已释放 用于 Numpy 运算)。 现在这是这样做的规范模型: http://code.activestate.com/recipes/577187-python-thread- pool/
(Py)FFTW 之所以如此快,是因为它重用了计划。这些必须为每个线程单独设置,以避免访问冲突错误,如下所示:
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
# Make separate fftw plans for each thread.
flag_for_fftw='patient'
self.inputa = np.zeros(someshape, dtype='float32')
self.outputa = np.zeros(someshape_semi, dtype='complex64')
# create a forward plan.
self.fft = fftw3.Plan(self.inputa,self.outputa, direction='forward', flags=[flag_for_fftw],nthreads=1)
# Initialize the arrays for the inverse fft.
self.inputb = np.zeros(someshape_semi, dtype='complex64')
self.outputb = np.zeros(someshape, dtype='float32')
# Create the backward plan.
self.ifft = fftw3.Plan(self.inputb,self.outputb, direction='backward', flags=[flag_for_fftw],nthreads=1)
self.start()
通过这种方式,可以传递参数 self.inputa
、self.outputa
、 self.fft
、self.inputb
、self.outputb
、self.ifft
到运行中的实际卷积器Worker 类中的方法。
这一切都很好,但我们不妨导入 ThreadPool 类:
from multiprocessing.pool import ThreadPool
但是我应该如何在 ThreadPool 中定义初始化程序以获得相同的结果? 根据文档 http://docs.python.org/library/multiprocessing.html “每个工作进程在启动时都会调用initializer(*initargs)”。 您可以在 Python 源代码中轻松检查这一点。
但是,当您设置线程池时,例如使用 2 个线程:
po = ThreadPool(2,initializer=tobedetermined)
并且您运行它,也许在某个循环中
po.apply_async(convolver,(some_input,))
如何使卷积器由初始化程序设置?你怎样才能让它单独使用 每个线程中的 FFTW 计划,无需为每个卷积重新计算 FFTW 计划?
干杯, 亚历克斯.
I am trying to do threaded convolution using PyFFTW, in order to calculate a large number of
2D convolutions simultaneously.
(One does not need separate processes, since the GIL is released
for Numpy operations).
Now here is the canonical model for doing so:
http://code.activestate.com/recipes/577187-python-thread-pool/
(Py)FFTW is so fast because it reuses plans. These have to be setup separately for each thread in order to avoid access violation errors, like this:
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
# Make separate fftw plans for each thread.
flag_for_fftw='patient'
self.inputa = np.zeros(someshape, dtype='float32')
self.outputa = np.zeros(someshape_semi, dtype='complex64')
# create a forward plan.
self.fft = fftw3.Plan(self.inputa,self.outputa, direction='forward', flags=[flag_for_fftw],nthreads=1)
# Initialize the arrays for the inverse fft.
self.inputb = np.zeros(someshape_semi, dtype='complex64')
self.outputb = np.zeros(someshape, dtype='float32')
# Create the backward plan.
self.ifft = fftw3.Plan(self.inputb,self.outputb, direction='backward', flags=[flag_for_fftw],nthreads=1)
self.start()
In this way one can pass the arguments self.inputa
, self.outputa
, self.fft
, self.inputb
, self.outputb
, self.ifft
to the actual convolver within the run method in the Worker class.
This is all nice, but we might as well import the ThreadPool class:
from multiprocessing.pool import ThreadPool
But how should I define the initializer in ThreadPool to get the same result?
According to the docs
http://docs.python.org/library/multiprocessing.html
"each worker process will call initializer(*initargs) when it starts".
You can easily check this in the Python source code.
However, when you set up the Threadpool, for example with 2 threads:
po = ThreadPool(2,initializer=tobedetermined)
and you run it, perhaps in some loop
po.apply_async(convolver,(some_input,))
how can you make convolver be setup by initializer? How can you make it use separate
FFTW plans in each thread, without recomputing the FFTW plan for every convolution?
Cheers,
Alex.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以使用线程本地存储 (
threading.local()
) 的函数包装卷积器调用来初始化 PyFFTW 并记住结果you can wrap the convolver call with a function that uses Thread Local Storage (
threading.local()
) to initialize PyFFTW and remember the result