multiprocessing.Pool - PicklingError:无法pickle<类型 “thread.lock”>:属性查找thread.lock失败类型>
multiprocessing.Pool
让我发疯......
我想升级许多软件包,对于每一个软件包,我都必须检查是否有更高的版本。这是通过 check_one
函数完成的。
主要代码位于 Updater.update
方法中:我在其中创建 Pool 对象并调用 map()
方法。
这是代码:
def check_one(args):
res, total, package, version = args
i = res.qsize()
logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
i / float(total), package, i, total, addn=False)
try:
json = PyPIJson(package).retrieve()
new_version = Version(json['info']['version'])
except Exception as e:
logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
return
if new_version > version:
res.put_nowait((package, version, new_version, json))
class Updater(FileManager):
# __init__ and other methods...
def update(self):
logger.info('Searching for updates')
packages = Queue.Queue()
data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
for dist in self.working_set)
pool = multiprocessing.Pool()
pool.map(check_one, data)
pool.close()
pool.join()
while True:
try:
package, version, new_version, json = packages.get_nowait()
except Queue.Empty:
break
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
new_version,
version)
u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
if u:
self.upgrade(package, json, new_version)
else:
logger.info('{0} has not been upgraded', package)
self._clean()
logger.success('Updating finished successfully')
当我运行它时,我收到这个奇怪的错误:
Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
multiprocessing.Pool
is driving me crazy...
I want to upgrade many packages, and for every one of them I have to check whether there is a greater version or not. This is done by the check_one
function.
The main code is in the Updater.update
method: there I create the Pool object and call the map()
method.
Here is the code:
def check_one(args):
res, total, package, version = args
i = res.qsize()
logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
i / float(total), package, i, total, addn=False)
try:
json = PyPIJson(package).retrieve()
new_version = Version(json['info']['version'])
except Exception as e:
logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
return
if new_version > version:
res.put_nowait((package, version, new_version, json))
class Updater(FileManager):
# __init__ and other methods...
def update(self):
logger.info('Searching for updates')
packages = Queue.Queue()
data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
for dist in self.working_set)
pool = multiprocessing.Pool()
pool.map(check_one, data)
pool.close()
pool.join()
while True:
try:
package, version, new_version, json = packages.get_nowait()
except Queue.Empty:
break
txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
new_version,
version)
u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
if u:
self.upgrade(package, json, new_version)
else:
logger.info('{0} has not been upgraded', package)
self._clean()
logger.success('Updating finished successfully')
When I run it I get this weird error:
Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
多处理通过
mp.SimpleQueue
将任务(包括check_one
和data
)传递给工作进程。与 Queue.Queue 不同,放入 mp.SimpleQueue 中的所有内容都必须是可选取的。Queue.Queue
不可选取:产生此异常:
您的
data
包含packages
,它是一个 Queue.Queue。这可能就是问题的根源。以下是一个可能的解决方法:
Queue
用于两个目的:qsize
)为了在多个进程之间共享值,我们可以使用
mp.Value
,而不是调用qsize
。我们可以(并且应该)只从对
check_one
的调用中返回值,而不是将结果存储在队列中。 pool.map 将结果收集到自己创建的队列中,并将结果作为 pool.map 的返回值返回。例如:
multiprocessing passes tasks (which include
check_one
anddata
) to the worker processes through amp.SimpleQueue
. UnlikeQueue.Queue
s, everything put in themp.SimpleQueue
must be pickable.Queue.Queue
s are not pickable:yields this exception:
Your
data
includespackages
, which is a Queue.Queue. That might be the source of the problem.Here is a possible workaround: The
Queue
is being used for two purposes:qsize
)Instead of calling
qsize
, to share a value between multiple processes, we could use amp.Value
.Instead of storing results in a queue, we can (and should) just return values from calls to
check_one
. Thepool.map
collects the results in a queue of its own making, and returns the results as the return value ofpool.map
.For example:
经过对类似问题的大量挖掘之后...
事实证明,任何恰好包含 threading.Condition() 对象的对象都永远不会与 multiprocessing.Pool 一起使用。
这是一个示例,
我使用 Python 2.7.5 运行它并遇到相同的错误:
但随后在 python 3.4.1 上运行它并且此问题已得到修复。
尽管对于我们这些仍然使用 2.7.x 的人来说,我还没有遇到任何有用的解决方法。
After a lot of digging on a similar issue...
It also turns out that ANY object that happens to contain a threading.Condition() object will NEVER NEVER work with multiprocessing.Pool.
Here is an example
I'm ran this with Python 2.7.5 and hit the same error:
But then ran it on python 3.4.1 and this issue has been fixed.
Although I haven't come across any useful workarounds yet for those of us still on 2.7.x.
我在 docker 上使用 python 3.6 版本时遇到了这个问题。更改版本为3.7.3,解决。
I experienced this issue with python version 3.6 on docker. Changed the version to 3.7.3 and it was solved.