python 子类化 multiprocessing.Process

发布于 2024-12-21 05:23:58 字数 835 浏览 0 评论 0 原文

我是Python面向对象的新手,我正在将现有的应用程序重写为面向对象的版本,因为现在开发人员在增加,我的代码变得难以维护。

通常我使用多处理队列,但我从这个例子中发现 http://www.doughellmann.com/ PyMOTW/multiprocessing/basics.html 我可以子类 multiprocessing.Process 所以我认为这是一个好主意,我编写了一个类来测试如下:

代码:

from multiprocessing import Process
class Processor(Process):
    def return_name(self):
        return "Process %s" % self.name
    def run(self):
        return self.return_name()

processes = []


if __name__ == "__main__":

        for i in range(0,5):
                p=Processor()
                processes.append(p)
                p.start()
        for p in processes:
                p.join()

但是我无法取回值,我如何以这种方式使用队列?

编辑:我想获取返回值并思考将 Queues() 放在哪里。

I am new to python object oriented and I am rewriting my existing application as an object oriented version, because now developers are increasing and my code is becoming un-maintainable.

Normally I use multiprocessing queues but I found from this example http://www.doughellmann.com/PyMOTW/multiprocessing/basics.html that I can subclass multiprocessing.Process so I think it's a good idea and I wrote a class to test like this:

code:

from multiprocessing import Process
class Processor(Process):
    def return_name(self):
        return "Process %s" % self.name
    def run(self):
        return self.return_name()

processes = []


if __name__ == "__main__":

        for i in range(0,5):
                p=Processor()
                processes.append(p)
                p.start()
        for p in processes:
                p.join()

However I cannot get back the values, how can I use queues in this way?

EDIT: I want to get the return value and thinking where to put Queues().

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

笑看君怀她人 2024-12-28 05:23:58

子类化multiprocessing.Process

但是我无法取回值,如何以这种方式使用队列?

Process 需要一个 Queue() 来接收结果...如何子类化 multiprocessing.Process 的示例如下...

from multiprocessing import Process, Queue
class Processor(Process):

    def __init__(self, queue, idx, **kwargs):
        super(Processor, self).__init__()
        self.queue = queue
        self.idx = idx
        self.kwargs = kwargs

    def run(self):
        """Build some CPU-intensive tasks to run via multiprocessing here."""
        hash(frozenset(self.kwargs.items())) # Shameless usage of CPU for no gain...

        ## Return some information back through multiprocessing.Queue
        ## NOTE: self.name is an attribute of multiprocessing.Process
        self.queue.put("Process idx={0} is called '{1}'".format(self.idx, self.name))

if __name__ == "__main__":
    NUMBER_OF_PROCESSES = 5

    ## Create a list to hold running Processor object instances...
    processes = list()

    q = Queue()  # Build a single queue to send to all process objects...
    for i in range(0, NUMBER_OF_PROCESSES):
        p=Processor(queue=q, idx=i)
        p.start()
        processes.append(p)

    # Incorporating ideas from this answer, below...
    #    https://stackoverflow.com/a/42137966/667301
    [proc.join() for proc in processes]
    while not q.empty():
        print("RESULT: {0}".format(q.get()))   # get results from the queue...

在我的机器上,这会导致...

$ python test.py
RESULT: Process idx=0 is called 'Processor-1'
RESULT: Process idx=4 is called 'Processor-5'
RESULT: Process idx=3 is called 'Processor-4'
RESULT: Process idx=1 is called 'Processor-2'
RESULT: Process idx=2 is called 'Processor-3'
$

# Using `multiprocessing.Pool`:

FWIW,我发现子类化 multiprocessing.Process 的一个缺点是您无法利用 multiprocessing.Pool 的所有内置优点;如果您不需要生产者和消费者代码通过队列相互通信,Pool 为您提供了一个非常好的 API。

您只需使用一些创造性的返回值就可以做很多事情...在下面的示例中,我使用 dict() 封装来自 pool_job() 的输入和输出值。 ..

from multiprocessing import Pool

def pool_job(input_val=0):
    # FYI, multiprocessing.Pool can't guarantee that it keeps inputs ordered correctly
    # dict format is {input: output}...
    return {'pool_job(input_val={0})'.format(input_val): int(input_val)*12}

pool = Pool(5)  # Use 5 multiprocessing processes to handle jobs...
results = pool.map(pool_job, xrange(0, 12)) # map xrange(0, 12) into pool_job()
print results

结果是:

[
    {'pool_job(input_val=0)': 0}, 
    {'pool_job(input_val=1)': 12}, 
    {'pool_job(input_val=2)': 24}, 
    {'pool_job(input_val=3)': 36}, 
    {'pool_job(input_val=4)': 48}, 
    {'pool_job(input_val=5)': 60}, 
    {'pool_job(input_val=6)': 72}, 
    {'pool_job(input_val=7)': 84}, 
    {'pool_job(input_val=8)': 96}, 
    {'pool_job(input_val=9)': 108}, 
    {'pool_job(input_val=10)': 120}, 
    {'pool_job(input_val=11)': 132}
]

显然,pool_job() 中还需要进行大量其他改进,例如错误处理,但这说明了要点。仅供参考,此答案提供了如何使用multiprocessing.Pool的另一个示例。

Subclassing multiprocessing.Process:

However I cannot get back the values, how can I use queues in this way?

Process needs a Queue() to receive the results... An example of how to subclass multiprocessing.Process follows...

from multiprocessing import Process, Queue
class Processor(Process):

    def __init__(self, queue, idx, **kwargs):
        super(Processor, self).__init__()
        self.queue = queue
        self.idx = idx
        self.kwargs = kwargs

    def run(self):
        """Build some CPU-intensive tasks to run via multiprocessing here."""
        hash(frozenset(self.kwargs.items())) # Shameless usage of CPU for no gain...

        ## Return some information back through multiprocessing.Queue
        ## NOTE: self.name is an attribute of multiprocessing.Process
        self.queue.put("Process idx={0} is called '{1}'".format(self.idx, self.name))

if __name__ == "__main__":
    NUMBER_OF_PROCESSES = 5

    ## Create a list to hold running Processor object instances...
    processes = list()

    q = Queue()  # Build a single queue to send to all process objects...
    for i in range(0, NUMBER_OF_PROCESSES):
        p=Processor(queue=q, idx=i)
        p.start()
        processes.append(p)

    # Incorporating ideas from this answer, below...
    #    https://stackoverflow.com/a/42137966/667301
    [proc.join() for proc in processes]
    while not q.empty():
        print("RESULT: {0}".format(q.get()))   # get results from the queue...

On my machine, this results in...

$ python test.py
RESULT: Process idx=0 is called 'Processor-1'
RESULT: Process idx=4 is called 'Processor-5'
RESULT: Process idx=3 is called 'Processor-4'
RESULT: Process idx=1 is called 'Processor-2'
RESULT: Process idx=2 is called 'Processor-3'
$

# Using `multiprocessing.Pool`:

FWIW, one disadvantage I've found to subclassing multiprocessing.Process is that you can't leverage all the built-in goodness of multiprocessing.Pool; Pool gives you a very nice API if you don't need your producer and consumer code to talk to each other through a queue.

You can do a lot just with some creative return values... in the following example, I use a dict() to encapsulate input and output values from pool_job()...

from multiprocessing import Pool

def pool_job(input_val=0):
    # FYI, multiprocessing.Pool can't guarantee that it keeps inputs ordered correctly
    # dict format is {input: output}...
    return {'pool_job(input_val={0})'.format(input_val): int(input_val)*12}

pool = Pool(5)  # Use 5 multiprocessing processes to handle jobs...
results = pool.map(pool_job, xrange(0, 12)) # map xrange(0, 12) into pool_job()
print results

This results in:

[
    {'pool_job(input_val=0)': 0}, 
    {'pool_job(input_val=1)': 12}, 
    {'pool_job(input_val=2)': 24}, 
    {'pool_job(input_val=3)': 36}, 
    {'pool_job(input_val=4)': 48}, 
    {'pool_job(input_val=5)': 60}, 
    {'pool_job(input_val=6)': 72}, 
    {'pool_job(input_val=7)': 84}, 
    {'pool_job(input_val=8)': 96}, 
    {'pool_job(input_val=9)': 108}, 
    {'pool_job(input_val=10)': 120}, 
    {'pool_job(input_val=11)': 132}
]

Obviously there are plenty of other improvements to be made in pool_job(), such as error handling, but this illustrates the essentials. FYI, this answer provides another example of how to use multiprocessing.Pool.

嗼ふ静 2024-12-28 05:23:58

非常感谢大家。

现在我是如何完成它的:)

在这个例子中,我使用多个队列,因为我不想在每个队列之间进行通信,而只想与父进程进行通信。

from multiprocessing import Process,Queue
class Processor(Process):
    def __init__(self,queue):
        Process.__init__(self)
        self.que=queue
    def get_name(self):
        return "Process %s" % self.name
    def run(self):
        self.que.put(self.get_name())



if __name__ == "__main__":

        processes = []
        for i in range(0,5):
                p=Processor(Queue())
                processes.append(p)
                p.start()
        for p in processes:
                p.join()
                print p.que.get()

Thanks a lot everyone.

Now heres how i got it done :)

In this example i use multiple queus as i do not want to communicate between each ohter but only with parent process.

from multiprocessing import Process,Queue
class Processor(Process):
    def __init__(self,queue):
        Process.__init__(self)
        self.que=queue
    def get_name(self):
        return "Process %s" % self.name
    def run(self):
        self.que.put(self.get_name())



if __name__ == "__main__":

        processes = []
        for i in range(0,5):
                p=Processor(Queue())
                processes.append(p)
                p.start()
        for p in processes:
                p.join()
                print p.que.get()
难得心□动 2024-12-28 05:23:58

Process.run 的返回值不会去任何地方。您需要将它们发送回父进程,例如使用 multiprocessing.Queue (此处的文档)。

The return value of Process.run doesn't go anywhere. You need to send them back to the parent process, e.g. using a multiprocessing.Queue (docs here).

迷雾森÷林ヴ 2024-12-28 05:23:58

迈克的答案是最好的,但为了完整起见,我想提一下我更喜欢从<中获取队列code>join contexts 所以最后一点看起来像这样:

[proc.join() for proc in processes] # 1. join

while not q.empty(): # 2. get the results
    print "RESULT: %s" % q.get()

Mike's answer is the best, but just for completeness I want to mention that I prefer harvesting the queue out of join contexts so the last bit would look like this:

[proc.join() for proc in processes] # 1. join

while not q.empty(): # 2. get the results
    print "RESULT: %s" % q.get()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文