multiprocessing.Pool - PicklingError:无法pickle<类型 “thread.lock”>:属性查找thread.lock失败

发布于 2024-12-11 13:41:43 字数 2505 浏览 0 评论 0原文

multiprocessing.Pool 让我发疯......
我想升级许多软件包,对于每一个软件包,我都必须检查是否有更高的版本。这是通过 check_one 函数完成的。
主要代码位于 Updater.update 方法中:我在其中创建 Pool 对象并调用 map() 方法。

这是代码:

def check_one(args):
    res, total, package, version = args
    i = res.qsize()
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
        i / float(total), package, i, total, addn=False)
    try:
        json = PyPIJson(package).retrieve()
        new_version = Version(json['info']['version'])
    except Exception as e:
        logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
        return
    if new_version > version:
        res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

    # __init__ and other methods...

    def update(self):    
        logger.info('Searching for updates')
        packages = Queue.Queue()
        data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
            for dist in self.working_set)
        pool = multiprocessing.Pool()
        pool.map(check_one, data)
        pool.close()
        pool.join()
        while True:
            try:
                package, version, new_version, json = packages.get_nowait()
            except Queue.Empty:
                break
            txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
                                                                                      new_version,
                                                                                      version)
            u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
            if u:
                self.upgrade(package, json, new_version)
            else:
                logger.info('{0} has not been upgraded', package)
        self._clean()
        logger.success('Updating finished successfully')

当我运行它时,我收到这个奇怪的错误:

Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

multiprocessing.Pool is driving me crazy...
I want to upgrade many packages, and for every one of them I have to check whether there is a greater version or not. This is done by the check_one function.
The main code is in the Updater.update method: there I create the Pool object and call the map() method.

Here is the code:

def check_one(args):
    res, total, package, version = args
    i = res.qsize()
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
        i / float(total), package, i, total, addn=False)
    try:
        json = PyPIJson(package).retrieve()
        new_version = Version(json['info']['version'])
    except Exception as e:
        logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
        return
    if new_version > version:
        res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

    # __init__ and other methods...

    def update(self):    
        logger.info('Searching for updates')
        packages = Queue.Queue()
        data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
            for dist in self.working_set)
        pool = multiprocessing.Pool()
        pool.map(check_one, data)
        pool.close()
        pool.join()
        while True:
            try:
                package, version, new_version, json = packages.get_nowait()
            except Queue.Empty:
                break
            txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
                                                                                      new_version,
                                                                                      version)
            u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
            if u:
                self.upgrade(package, json, new_version)
            else:
                logger.info('{0} has not been upgraded', package)
        self._clean()
        logger.success('Updating finished successfully')

When I run it I get this weird error:

Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

笑梦风尘 2024-12-18 13:41:43

多处理通过 mp.SimpleQueue 将任务(包括 check_onedata)传递给工作进程。与 Queue.Queue 不同,放入 mp.SimpleQueue 中的所有内容都必须是可选取的。 Queue.Queue 不可选取:

import multiprocessing as mp
import Queue

def foo(queue):
    pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

产生此异常:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

您的 data 包含 packages,它是一个 Queue.Queue。这可能就是问题的根源。


以下是一个可能的解决方法:Queue 用于两个目的:

  1. 找出大致大小(通过调用 qsize
  2. 以存储结果以供以后检索。

为了在多个进程之间共享值,我们可以使用 mp.Value,而不是调用 qsize

我们可以(并且应该)只从对 check_one 的调用中返回值,而不是将结果存储在队列中。 pool.map 将结果收集到自己创建的队列中,并将结果作为 pool.map 的返回值返回。

例如:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)


qsize = mp.Value('i', 1)
def check_one(args):
    total, package, version = args
    i = qsize.value
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
        i / float(total), package, i, total))
    new_version = random.randrange(0,100)
    qsize.value += 1
    if new_version > version:
        return (package, version, new_version, None)
    else:
        return None

def update():    
    logger.info('Searching for updates')
    set_len=10
    data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
             for i in range(set_len) )
    pool = mp.Pool()
    results = pool.map(check_one, data)
    pool.close()
    pool.join()
    for result in results:
        if result is None: continue
        package, version, new_version, json = result
        txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
            package, new_version, version)
        logger.info(txt)
    logger.info('Updating finished successfully')

if __name__=='__main__':
    logging.basicConfig(level=logging.DEBUG)
    update()

multiprocessing passes tasks (which include check_one and data) to the worker processes through a mp.SimpleQueue. Unlike Queue.Queues, everything put in the mp.SimpleQueue must be pickable. Queue.Queues are not pickable:

import multiprocessing as mp
import Queue

def foo(queue):
    pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

yields this exception:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

Your data includes packages, which is a Queue.Queue. That might be the source of the problem.


Here is a possible workaround: The Queue is being used for two purposes:

  1. to find out the approximate size (by calling qsize)
  2. to store results for later retrieval.

Instead of calling qsize, to share a value between multiple processes, we could use a mp.Value.

Instead of storing results in a queue, we can (and should) just return values from calls to check_one. The pool.map collects the results in a queue of its own making, and returns the results as the return value of pool.map.

For example:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)


qsize = mp.Value('i', 1)
def check_one(args):
    total, package, version = args
    i = qsize.value
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
        i / float(total), package, i, total))
    new_version = random.randrange(0,100)
    qsize.value += 1
    if new_version > version:
        return (package, version, new_version, None)
    else:
        return None

def update():    
    logger.info('Searching for updates')
    set_len=10
    data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
             for i in range(set_len) )
    pool = mp.Pool()
    results = pool.map(check_one, data)
    pool.close()
    pool.join()
    for result in results:
        if result is None: continue
        package, version, new_version, json = result
        txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
            package, new_version, version)
        logger.info(txt)
    logger.info('Updating finished successfully')

if __name__=='__main__':
    logging.basicConfig(level=logging.DEBUG)
    update()
溺孤伤于心 2024-12-18 13:41:43

经过对类似问题的大量挖掘之后...

事实证明,任何恰好包含 threading.Condition() 对象的对象都永远不会与 multiprocessing.Pool 一起使用。

这是一个示例,

import multiprocessing as mp
import threading

class MyClass(object):
   def __init__(self):
      self.cond = threading.Condition()

def foo(mc):
   pass

pool=mp.Pool()
mc=MyClass()
pool.map(foo,(mc,))

我使用 Python 2.7.5 运行它并遇到相同的错误:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
  File "/usr/lib64/python2.7/threading.py", line 764, in run
self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

但随后在 python 3.4.1 上运行它并且此问题已得到修复。

尽管对于我们这些仍然使用 2.7.x 的人来说,我还没有遇到任何有用的解决方法。

After a lot of digging on a similar issue...

It also turns out that ANY object that happens to contain a threading.Condition() object will NEVER NEVER work with multiprocessing.Pool.

Here is an example

import multiprocessing as mp
import threading

class MyClass(object):
   def __init__(self):
      self.cond = threading.Condition()

def foo(mc):
   pass

pool=mp.Pool()
mc=MyClass()
pool.map(foo,(mc,))

I'm ran this with Python 2.7.5 and hit the same error:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
  File "/usr/lib64/python2.7/threading.py", line 764, in run
self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

But then ran it on python 3.4.1 and this issue has been fixed.

Although I haven't come across any useful workarounds yet for those of us still on 2.7.x.

吾家有女初长成 2024-12-18 13:41:43

我在 docker 上使用 python 3.6 版本时遇到了这个问题。更改版本为3.7.3,解决。

I experienced this issue with python version 3.6 on docker. Changed the version to 3.7.3 and it was solved.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文