如何在 Python 多处理中动态创建每个进程队列

发布于 2024-11-29 11:43:13 字数 2411 浏览 1 评论 0原文

我想动态创建多个进程,其中每个实例都有一个队列,用于接收来自其他实例的传入消息,并且每个实例还可以创建新实例。所以我们最终得到了一个进程网络,所有进程都互相发送。每个实例都可以发送给其他实例。

下面的代码可以实现我想要的功能:它使用 Manager.dict() 来存储队列,确保传播更新,并使用 Lock() 保护对队列的写访问。但是,当添加新队列时,它会抛出“RuntimeError:队列对象只能通过继承在进程之间共享”

问题是,在启动时,我们不知道最终需要多少个队列,所以我们必须动态创建它们。但由于除了在构建时之外我们无法共享队列,所以我不知道该怎么做。

我知道一种可能性是使 queues 成为一个全局变量,而不是传递给 __init__ 的托管变量:据我所知,问题在于添加到 queues 变量不会传播到其他进程。

编辑我正在研究进化算法。 EA 是一种机器学习技术。 EA 模拟“群体”,通过适者生存、交叉和突变而进化。在并行 EA 中,就像这里一样,我们也有群体之间的迁移,对应于进程间通信。岛屿也可以产生新的岛屿,因此我们需要一种在动态创建的进程之间发送消息的方法。

import random, time
from multiprocessing import Process, Queue, Lock, Manager, current_process
try:
    from queue import Empty as EmptyQueueException
except ImportError:
    from Queue import Empty as EmptyQueueException

class MyProcess(Process):
    def __init__(self, queues, lock):
        super(MyProcess, self).__init__(target=lambda x: self.run(x),
                                     args=tuple())
        self.queues = queues
        self.lock = lock
        # acquire lock and add a new queue for this process
        with self.lock:
            self.id = len(list(self.queues.keys()))
            self.queues[self.id] = Queue()

    def run(self):
        while len(list(self.queues.keys())) < 10:

            # make a new process
            new = MyProcess(self.lock)
            new.start()

            # send a message to a random process
            dest_key = random.choice(list(self.queues.keys()))
            dest = self.queues[dest_key]
            dest.put("hello to %s from %s" % (dest_key, self.id))

            # receive messages
            message = True
            while message:
                try:
                    message = self.queues[self.id].get(False) # don't block
                    print("%s received: %s" % (self.id, message))
                except EmptyQueueException:
                    break

            # what queues does this process know about?
            print("%d: I know of %s" %
                  (self.id, " ".join([str(id) for id in self.queues.keys()])))

            time.sleep(1)

if __name__ == "__main__":
    # Construct MyProcess with a Manager.dict for storing the queues
    # and a lock to protect write access. Start.
    MyProcess(Manager().dict(), Lock()).start()

I want to dynamically create multiple Processes, where each instance has a queue for incoming messages from other instances, and each instance can also create new instances. So we end up with a network of processes all sending to each other. Every instance is allowed to send to every other.

The code below would do what I want: it uses a Manager.dict() to store the queues, making sure updates are propagated, and a Lock() to protect write-access to the queues. However when adding a new queue it throws "RuntimeError: Queue objects should only be shared between processes through inheritance".

The problem is that when starting-up, we don't know how many queues will eventually be needed, so we have to create them dynamically. But since we can't share queues except at construction time, I don't know how to do that.

I know that one possibility would be to make queues a global variable instead of a managed one passed-in to __init__: the problem then, as I understand it, is that additions to the queues variable wouldn't be propagated to other processes.

EDIT I'm working on evolutionary algorithms. EAs are a type of machine learning technique. An EA simulates a "population", which evolves by survival of the fittest, crossover, and mutation. In parallel EAs, as here, we also have migration between populations, corresponding to interprocess communication. Islands can also spawn new islands, and so we need a way to send messages between dynamically-created processes.

import random, time
from multiprocessing import Process, Queue, Lock, Manager, current_process
try:
    from queue import Empty as EmptyQueueException
except ImportError:
    from Queue import Empty as EmptyQueueException

class MyProcess(Process):
    def __init__(self, queues, lock):
        super(MyProcess, self).__init__(target=lambda x: self.run(x),
                                     args=tuple())
        self.queues = queues
        self.lock = lock
        # acquire lock and add a new queue for this process
        with self.lock:
            self.id = len(list(self.queues.keys()))
            self.queues[self.id] = Queue()

    def run(self):
        while len(list(self.queues.keys())) < 10:

            # make a new process
            new = MyProcess(self.lock)
            new.start()

            # send a message to a random process
            dest_key = random.choice(list(self.queues.keys()))
            dest = self.queues[dest_key]
            dest.put("hello to %s from %s" % (dest_key, self.id))

            # receive messages
            message = True
            while message:
                try:
                    message = self.queues[self.id].get(False) # don't block
                    print("%s received: %s" % (self.id, message))
                except EmptyQueueException:
                    break

            # what queues does this process know about?
            print("%d: I know of %s" %
                  (self.id, " ".join([str(id) for id in self.queues.keys()])))

            time.sleep(1)

if __name__ == "__main__":
    # Construct MyProcess with a Manager.dict for storing the queues
    # and a lock to protect write access. Start.
    MyProcess(Manager().dict(), Lock()).start()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

極樂鬼 2024-12-06 11:43:13

我不完全确定您的用例实际上是什么。也许如果您详细说明为什么要让每个进程动态生成一个具有连接队列的子进程,那么在这种情况下正确的解决方案是什么会更清楚。

无论如何,就目前的问题而言,目前似乎没有一个真正好的方法可以使用多处理动态创建管道或队列。

我认为,如果您愿意在每个进程中生成线程,您也许可以使用 multiprocessing.connection.Listener/Client 来回通信。我没有生成线程,而是采用了使用网络套接字并选择在线程之间进行通信的方法。

动态进程生成和网络套接字可能仍然不稳定,具体取决于多处理在生成/分叉新进程时如何清理文件描述符,并且您的解决方案很可能在 *nix 衍生品上更容易工作。如果您担心套接字开销,您可以使用 unix 域套接字来变得更轻量,但代价是增加在多个工作计算机上运行节点的复杂性。

无论如何,这里有一个使用网络套接字和全局进程列表来完成此操作的示例,因为我无法找到使多处理执行此操作的好方法。

import collections
import multiprocessing
import random
import select
import socket
import time


class MessagePassingProcess(multiprocessing.Process):
    def __init__(self, id_, processes):
        self.id = id_
        self.processes = processes
        self.queue = collections.deque()
        super(MessagePassingProcess, self).__init__()

    def run(self):
        print "Running"
        inputs = []
        outputs = []
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        address = self.processes[self.id]["address"]
        print "Process %s binding to %s"%(self.id, address)
        server.bind(address)
        server.listen(5)
        inputs.append(server)
        process = self.processes[self.id]
        process["listening"] = True
        self.processes[self.id] = process
        print "Process %s now listening!(%s)"%(self.id, process)
        while inputs:
            readable, writable, exceptional = select.select(inputs,
                                                           outputs,
                                                           inputs,
                                                           0.1)
            for sock in readable:
                print "Process %s has a readable scoket: %s"%(self.id,
                                                              sock)
                if sock is server:
                    print "Process %s has a readable server scoket: %s"%(self.id,
                                                              sock)
                    conn, addr = sock.accept()
                    conn.setblocking(0)
                    inputs.append(conn)
                else:
                    data = sock.recv(1024)
                    if data:
                        self.queue.append(data)
                        print "non server readable socket with data"
                    else:
                        inputs.remove(sock)
                        sock.close()
                        print "non server readable socket with no data"

            for sock in exceptional:
                print "exception occured on socket %s"%(sock)
                inputs.remove(sock)
                sock.close()

            while len(self.queue) >= 1:
                print "Received:", self.queue.pop()

            # send a message to a random process:
            random_id = random.choice(list(self.processes.keys()))
            print "%s Attempting to send message to %s"%(self.id, random_id)
            random_process = self.processes[random_id]
            print "random_process:", random_process
            if random_process["listening"]:
                random_address = random_process["address"]
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    s.connect(random_address)
                except socket.error:
                    print "%s failed to send to %s"%(self.id, random_id)
                else:
                    s.send("Hello World!")                    
                finally:
                    s.close()

            time.sleep(1)

if __name__=="__main__":
    print "hostname:", socket.getfqdn()
    print dir(multiprocessing)
    manager = multiprocessing.Manager()
    processes = manager.dict()
    joinable = []
    for n in xrange(multiprocessing.cpu_count()):
        mpp = MessagePassingProcess(n, processes)
        processes[n] = {"id":n,
                        "address":("127.0.0.1",7000+n),
                        "listening":False,
                        }
        print "processes[%s] = %s"%(n, processes[n])
        mpp.start()
        joinable.append(mpp)
    for process in joinable:
        process.join()

经过大量的打磨和测试,这可能是 multiprocessing.Process 和/或 multiprocessing.Pool 的逻辑扩展,因为这看起来确实是人们会使用的东西。在标准库中可用。创建一个使用可供其他队列发现的套接字的 DynamicQueue 类也可能是合理的。

无论如何,希望它有所帮助。如果您找到更好的方法来完成这项工作,请更新。

I'm not entirely sure what your use case actually is here. Perhaps if you elaborate a bit more on why you want to have each process dynamically spawn a child with a connected queue it'll be a bit more clear what the right solution would be in this situation.

Anyway, with the question as is it seems that there is not really a good way to dynamically create pipes or queues with Multiprocessing right now.

I think that if you're willing to spawn threads within each of your processes you may be able to use multiprocessing.connection.Listener/Client to communicate back and forth. Rather than spawning threads I took an approach using network sockets and select to communicate between threads.

Dynamic process spawning and network sockets may still be flaky depending on how multiprocessing cleans up your file descriptors when spawning/forking a new process and your solution will most likely work more easily on *nix derivatives. If you're concerned about socket overhead you could use unix domain sockets to be a little more lightweight at the cost of added complexity running nodes on multiple worker machines.

Anyway, here's an example using network sockets and a global process list to accomplish this since I was unable to find a good way to make multiprocessing do it.

import collections
import multiprocessing
import random
import select
import socket
import time


class MessagePassingProcess(multiprocessing.Process):
    def __init__(self, id_, processes):
        self.id = id_
        self.processes = processes
        self.queue = collections.deque()
        super(MessagePassingProcess, self).__init__()

    def run(self):
        print "Running"
        inputs = []
        outputs = []
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        address = self.processes[self.id]["address"]
        print "Process %s binding to %s"%(self.id, address)
        server.bind(address)
        server.listen(5)
        inputs.append(server)
        process = self.processes[self.id]
        process["listening"] = True
        self.processes[self.id] = process
        print "Process %s now listening!(%s)"%(self.id, process)
        while inputs:
            readable, writable, exceptional = select.select(inputs,
                                                           outputs,
                                                           inputs,
                                                           0.1)
            for sock in readable:
                print "Process %s has a readable scoket: %s"%(self.id,
                                                              sock)
                if sock is server:
                    print "Process %s has a readable server scoket: %s"%(self.id,
                                                              sock)
                    conn, addr = sock.accept()
                    conn.setblocking(0)
                    inputs.append(conn)
                else:
                    data = sock.recv(1024)
                    if data:
                        self.queue.append(data)
                        print "non server readable socket with data"
                    else:
                        inputs.remove(sock)
                        sock.close()
                        print "non server readable socket with no data"

            for sock in exceptional:
                print "exception occured on socket %s"%(sock)
                inputs.remove(sock)
                sock.close()

            while len(self.queue) >= 1:
                print "Received:", self.queue.pop()

            # send a message to a random process:
            random_id = random.choice(list(self.processes.keys()))
            print "%s Attempting to send message to %s"%(self.id, random_id)
            random_process = self.processes[random_id]
            print "random_process:", random_process
            if random_process["listening"]:
                random_address = random_process["address"]
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    s.connect(random_address)
                except socket.error:
                    print "%s failed to send to %s"%(self.id, random_id)
                else:
                    s.send("Hello World!")                    
                finally:
                    s.close()

            time.sleep(1)

if __name__=="__main__":
    print "hostname:", socket.getfqdn()
    print dir(multiprocessing)
    manager = multiprocessing.Manager()
    processes = manager.dict()
    joinable = []
    for n in xrange(multiprocessing.cpu_count()):
        mpp = MessagePassingProcess(n, processes)
        processes[n] = {"id":n,
                        "address":("127.0.0.1",7000+n),
                        "listening":False,
                        }
        print "processes[%s] = %s"%(n, processes[n])
        mpp.start()
        joinable.append(mpp)
    for process in joinable:
        process.join()

With a lot of polish and testing love this might be a logical extension to multiprocessing.Process and/or multiprocessing.Pool as this does seem like something people would use if it were available in the standard lib. It may also be reasonable to create a DynamicQueue class that uses sockets to be discoverable to other queues.

Anyway, hope it helps. Please update if you figure out a better way to make this work.

萌︼了一个春 2024-12-06 11:43:13

该代码基于已接受的答案。由于 OSX Snow Leopard 在多处理内容的某些使用上出现段错误,因此使用了 Python 3。

#!/usr/bin/env python3

import collections
from multiprocessing import Process, Manager, Lock, cpu_count
import random
import select
import socket
import time
import pickle

class Message:
    def __init__(self, origin):
        self.type = "long_msg"
        self.data = "X" * 3000
        self.origin = origin
    def __str__(self):
        return "%s %d" % (self.type, self.origin)

class MessagePassingProcess(Process):
    def __init__(self, processes, lock):
        self.lock = lock
        self.processes = processes
        with self.lock:
            self.id = len(list(processes.keys()))
            process_dict = {"id": self.id,
                            "address": ("127.0.0.1", 7000 + self.id),
                            "listening": False
                            }
            self.processes[self.id] = process_dict
        print("new process: processes[%s] = %s" % (self.id, processes[self.id]))
        self.queue = collections.deque()
        super(MessagePassingProcess, self).__init__()

    def run(self):
        print("Running")
        self.processes[self.id]["joinable"] = True
        inputs = []
        outputs = []
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        address = self.processes[self.id]["address"]
        print("Process %s binding to %s" % (self.id, address))
        server.bind(address)
        server.listen(5)
        inputs.append(server)
        process = self.processes[self.id]
        process["listening"] = True
        self.processes[self.id] = process
        print("Process %s now listening!(%s)" % (self.id, process))
        while inputs and len(list(self.processes.keys())) < 10:
            readable, writable, exceptional = select.select(inputs,
                                                           outputs,
                                                           inputs,
                                                           0.1)
            # read incoming messages
            for sock in readable:
                print("Process %s has a readable socket: %s" % (self.id, sock))
                if sock is server:
                    print("Process %s has a readable server socket: %s" %
                          (self.id, sock))
                    conn, addr = sock.accept()
                    conn.setblocking(0)
                    inputs.append(conn)
                else:
                    data = True
                    item = bytes() # empty bytes object, to be added to
                    recvs = 0
                    while data:
                        data = sock.recv(1024)
                        item += data
                        recvs += 1
                    if len(item):
                        self.queue.append(item)
                        print("non server readable socket: recvd %d bytes in %d parts"
                              % (len(item), recvs))
                    else:
                        inputs.remove(sock)
                        sock.close()
                        print("non server readable socket: nothing to read")

            for sock in exceptional:
                print("exception occured on socket %s" % (sock))
                inputs.remove(sock)
                sock.close()

            while len(self.queue):
                msg = pickle.loads(self.queue.pop())
                print("received:" + str(msg))

            # send a message to a random process:
            random_id = random.choice(list(self.processes.keys()))
            print("%s attempting to send message to %s" % (self.id, random_id))
            random_process = self.processes[random_id]
            if random_process["listening"]:
                random_address = random_process["address"]
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    s.connect(random_address)
                except socket.error:
                    print("%s failed to send to %s"%(self.id, random_id))
                else:
                    item = pickle.dumps(Message(self.id))
                    print("sending a total of %d bytes" % len(item))
                    s.sendall(item)
                finally:
                    s.close()

            # make a new process
            if random.random() < 0.1:
                mpp = MessagePassingProcess(self.processes, self.lock)
                mpp.start()
            else:
                time.sleep(1.0)
        print("process %d finished looping" % self.id)


if __name__=="__main__":
    manager = Manager()
    processes = manager.dict()
    lock = Lock()
    # make just one process: it will make more
    mpp = MessagePassingProcess(processes, lock)
    mpp.start()
    # this doesn't join on all the other processes created
    # subsequently
    mpp.join()

This code is based on the accepted answer. It's in Python 3 since OSX Snow Leopard segfaults on some uses of multiprocessing stuff.

#!/usr/bin/env python3

import collections
from multiprocessing import Process, Manager, Lock, cpu_count
import random
import select
import socket
import time
import pickle

class Message:
    def __init__(self, origin):
        self.type = "long_msg"
        self.data = "X" * 3000
        self.origin = origin
    def __str__(self):
        return "%s %d" % (self.type, self.origin)

class MessagePassingProcess(Process):
    def __init__(self, processes, lock):
        self.lock = lock
        self.processes = processes
        with self.lock:
            self.id = len(list(processes.keys()))
            process_dict = {"id": self.id,
                            "address": ("127.0.0.1", 7000 + self.id),
                            "listening": False
                            }
            self.processes[self.id] = process_dict
        print("new process: processes[%s] = %s" % (self.id, processes[self.id]))
        self.queue = collections.deque()
        super(MessagePassingProcess, self).__init__()

    def run(self):
        print("Running")
        self.processes[self.id]["joinable"] = True
        inputs = []
        outputs = []
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        address = self.processes[self.id]["address"]
        print("Process %s binding to %s" % (self.id, address))
        server.bind(address)
        server.listen(5)
        inputs.append(server)
        process = self.processes[self.id]
        process["listening"] = True
        self.processes[self.id] = process
        print("Process %s now listening!(%s)" % (self.id, process))
        while inputs and len(list(self.processes.keys())) < 10:
            readable, writable, exceptional = select.select(inputs,
                                                           outputs,
                                                           inputs,
                                                           0.1)
            # read incoming messages
            for sock in readable:
                print("Process %s has a readable socket: %s" % (self.id, sock))
                if sock is server:
                    print("Process %s has a readable server socket: %s" %
                          (self.id, sock))
                    conn, addr = sock.accept()
                    conn.setblocking(0)
                    inputs.append(conn)
                else:
                    data = True
                    item = bytes() # empty bytes object, to be added to
                    recvs = 0
                    while data:
                        data = sock.recv(1024)
                        item += data
                        recvs += 1
                    if len(item):
                        self.queue.append(item)
                        print("non server readable socket: recvd %d bytes in %d parts"
                              % (len(item), recvs))
                    else:
                        inputs.remove(sock)
                        sock.close()
                        print("non server readable socket: nothing to read")

            for sock in exceptional:
                print("exception occured on socket %s" % (sock))
                inputs.remove(sock)
                sock.close()

            while len(self.queue):
                msg = pickle.loads(self.queue.pop())
                print("received:" + str(msg))

            # send a message to a random process:
            random_id = random.choice(list(self.processes.keys()))
            print("%s attempting to send message to %s" % (self.id, random_id))
            random_process = self.processes[random_id]
            if random_process["listening"]:
                random_address = random_process["address"]
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    s.connect(random_address)
                except socket.error:
                    print("%s failed to send to %s"%(self.id, random_id))
                else:
                    item = pickle.dumps(Message(self.id))
                    print("sending a total of %d bytes" % len(item))
                    s.sendall(item)
                finally:
                    s.close()

            # make a new process
            if random.random() < 0.1:
                mpp = MessagePassingProcess(self.processes, self.lock)
                mpp.start()
            else:
                time.sleep(1.0)
        print("process %d finished looping" % self.id)


if __name__=="__main__":
    manager = Manager()
    processes = manager.dict()
    lock = Lock()
    # make just one process: it will make more
    mpp = MessagePassingProcess(processes, lock)
    mpp.start()
    # this doesn't join on all the other processes created
    # subsequently
    mpp.join()
べ映画 2024-12-06 11:43:13

提供标准库 socketserver 是为了帮助避免手动编程 select()。在此版本中,我们在单独的线程中启动套接字服务器,以便每个进程都可以在其主循环中进行(好吧,假装进行)计算。

#!/usr/bin/env python3

# Each Node is an mp.Process. It opens a client-side socket to send a
# message to another Node. Each Node listens using a separate thread
# running a socketserver (so avoiding manual programming of select()),
# which itself starts a new thread to handle each incoming connection.
# The socketserver puts received messages on an mp.Queue, where they
# are picked up by the Node for processing once per loop. This setup
# allows the Node to do computation in its main loop.

import multiprocessing as mp
import threading, random, socket, socketserver, time, pickle, queue

class Message:
    def __init__(self, origin):
        self.type = "long_message"
        self.data = "X" * random.randint(0, 2000)
        self.origin = origin
    def __str__(self):
        return "Message of type %s, length %d from %d" % (
            self.type, len(self.data), self.origin)

class Node(mp.Process):
    def __init__(self, nodes, lock):
        super().__init__()

        # Add this node to the Manager.dict of node descriptors.
        # Write-access is protected by a Lock.
        self.nodes = nodes
        self.lock = lock
        with self.lock:
            self.id = len(list(nodes.keys()))
            host = "127.0.0.1"
            port = 7022 + self.id
            node = {"id": self.id, "address": (host, port), "listening": False}
            self.nodes[self.id] = node
        print("new node: nodes[%s] = %s" % (self.id, nodes[self.id]))

        # Set up socketserver.

        # don't know why collections.deque or queue.Queue don't work here.
        self.queue = mp.Queue()

        # This MixIn usage is directly from the python.org
        # socketserver docs
        class ThreadedTCPServer(socketserver.ThreadingMixIn,
                                socketserver.TCPServer):
            pass
        class HandlerWithQueue(socketserver.BaseRequestHandler):
            # Something of a hack: using class variables to give the
            # Handler access to this Node-specific data
            handler_queue = self.queue
            handler_id = self.id
            def handle(self):
                # could receive data in multiple chunks, so loop and
                # concatenate
                item = bytes()
                recvs = 0
                data = True
                if data:
                    data = self.request.recv(4096)
                    item += data
                    recvs += 1
                if len(item):
                    # Receive a pickle here and put it straight on
                    # queue. Will be unpickled when taken off queue.
                    print("%d: socketserver received %d bytes in %d recv()s"
                          % (self.handler_id, len(item), recvs))
                    self.handler_queue.put(item)

        self.server = ThreadedTCPServer((host, port), HandlerWithQueue)
        self.server_thread = threading.Thread(target=self.server.serve_forever)
        self.server_thread.setDaemon(True) # Tell it to exit when Node exits.
        self.server_thread.start()
        print("%d: server loop running in thread %s" %
              (self.id, self.server_thread.getName()))

        # Now ready to receive
        with self.lock:
            # Careful: if we assign directly to
            # self.nodes[self.id]["listening"], the new value *won't*
            # be propagated to other Nodes by the Manager.dict. Have
            # to use this hack to re-assign the Manager.dict key.
            node = self.nodes[self.id]
            node["listening"] = True
            self.nodes[self.id] = node

    def send(self):
        # Find a destination. All listening nodes are eligible except self.
        dests = [node for node in self.nodes.values()
                 if node["id"] != self.id and node["listening"]]
        if len(dests) < 1:
            print("%d: no node to send to" % self.id)
            return
        dest = random.choice(dests)
        print("%d: sending to %s" % (self.id, dest["id"]))

        # send
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            s.connect(dest["address"])
        except socket.error:
            print("%s: failed to send to %s" % (self.id, dest["id"]))
        else:
            item = pickle.dumps(Message(self.id))
            s.sendall(item)
        finally:
            s.close()

    # Check our queue for incoming messages.
    def receive(self):
        while True:
            try:
                message = pickle.loads(self.queue.get(False))
                print("%d: received %s" % (self.id, str(message)))
            except queue.Empty:
                break

    def run(self):
        print("%d: in run()" % self.id)
        # Main loop. Loop until at least 10 Nodes exist. Because of
        # parallel processing we might get a few more
        while len(list(self.nodes.keys())) < 10:
            time.sleep(random.random() * 0.5) # simulate heavy computation
            self.send()
            time.sleep(random.random() * 0.5) # simulate heavy computation
            self.receive()
            # maybe make a new node
            if random.random() < 0.1:
                new = Node(self.nodes, self.lock)
                new.start()
        # Seems natural to call server_thread.shutdown() here, but it
        # hangs. But since we've set the thread to be a daemon, it
        # will exit when this process does.
        print("%d: finished" % self.id)

if __name__=="__main__":
    manager = mp.Manager()
    nodes = manager.dict()
    lock = mp.Lock()
    # make just one node: it will make more
    node0 = Node(nodes, lock)
    node0.start()
    # This doesn't join on all the other nodes created subsequently.
    # But everything seems to work out ok.
    node0.join()

The standard library socketserver is provided to help avoid programming select() manually. In this version, we start a socketserver in a separate thread so that each Process can do (well, pretend to do) computation in its main loop.

#!/usr/bin/env python3

# Each Node is an mp.Process. It opens a client-side socket to send a
# message to another Node. Each Node listens using a separate thread
# running a socketserver (so avoiding manual programming of select()),
# which itself starts a new thread to handle each incoming connection.
# The socketserver puts received messages on an mp.Queue, where they
# are picked up by the Node for processing once per loop. This setup
# allows the Node to do computation in its main loop.

import multiprocessing as mp
import threading, random, socket, socketserver, time, pickle, queue

class Message:
    def __init__(self, origin):
        self.type = "long_message"
        self.data = "X" * random.randint(0, 2000)
        self.origin = origin
    def __str__(self):
        return "Message of type %s, length %d from %d" % (
            self.type, len(self.data), self.origin)

class Node(mp.Process):
    def __init__(self, nodes, lock):
        super().__init__()

        # Add this node to the Manager.dict of node descriptors.
        # Write-access is protected by a Lock.
        self.nodes = nodes
        self.lock = lock
        with self.lock:
            self.id = len(list(nodes.keys()))
            host = "127.0.0.1"
            port = 7022 + self.id
            node = {"id": self.id, "address": (host, port), "listening": False}
            self.nodes[self.id] = node
        print("new node: nodes[%s] = %s" % (self.id, nodes[self.id]))

        # Set up socketserver.

        # don't know why collections.deque or queue.Queue don't work here.
        self.queue = mp.Queue()

        # This MixIn usage is directly from the python.org
        # socketserver docs
        class ThreadedTCPServer(socketserver.ThreadingMixIn,
                                socketserver.TCPServer):
            pass
        class HandlerWithQueue(socketserver.BaseRequestHandler):
            # Something of a hack: using class variables to give the
            # Handler access to this Node-specific data
            handler_queue = self.queue
            handler_id = self.id
            def handle(self):
                # could receive data in multiple chunks, so loop and
                # concatenate
                item = bytes()
                recvs = 0
                data = True
                if data:
                    data = self.request.recv(4096)
                    item += data
                    recvs += 1
                if len(item):
                    # Receive a pickle here and put it straight on
                    # queue. Will be unpickled when taken off queue.
                    print("%d: socketserver received %d bytes in %d recv()s"
                          % (self.handler_id, len(item), recvs))
                    self.handler_queue.put(item)

        self.server = ThreadedTCPServer((host, port), HandlerWithQueue)
        self.server_thread = threading.Thread(target=self.server.serve_forever)
        self.server_thread.setDaemon(True) # Tell it to exit when Node exits.
        self.server_thread.start()
        print("%d: server loop running in thread %s" %
              (self.id, self.server_thread.getName()))

        # Now ready to receive
        with self.lock:
            # Careful: if we assign directly to
            # self.nodes[self.id]["listening"], the new value *won't*
            # be propagated to other Nodes by the Manager.dict. Have
            # to use this hack to re-assign the Manager.dict key.
            node = self.nodes[self.id]
            node["listening"] = True
            self.nodes[self.id] = node

    def send(self):
        # Find a destination. All listening nodes are eligible except self.
        dests = [node for node in self.nodes.values()
                 if node["id"] != self.id and node["listening"]]
        if len(dests) < 1:
            print("%d: no node to send to" % self.id)
            return
        dest = random.choice(dests)
        print("%d: sending to %s" % (self.id, dest["id"]))

        # send
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            s.connect(dest["address"])
        except socket.error:
            print("%s: failed to send to %s" % (self.id, dest["id"]))
        else:
            item = pickle.dumps(Message(self.id))
            s.sendall(item)
        finally:
            s.close()

    # Check our queue for incoming messages.
    def receive(self):
        while True:
            try:
                message = pickle.loads(self.queue.get(False))
                print("%d: received %s" % (self.id, str(message)))
            except queue.Empty:
                break

    def run(self):
        print("%d: in run()" % self.id)
        # Main loop. Loop until at least 10 Nodes exist. Because of
        # parallel processing we might get a few more
        while len(list(self.nodes.keys())) < 10:
            time.sleep(random.random() * 0.5) # simulate heavy computation
            self.send()
            time.sleep(random.random() * 0.5) # simulate heavy computation
            self.receive()
            # maybe make a new node
            if random.random() < 0.1:
                new = Node(self.nodes, self.lock)
                new.start()
        # Seems natural to call server_thread.shutdown() here, but it
        # hangs. But since we've set the thread to be a daemon, it
        # will exit when this process does.
        print("%d: finished" % self.id)

if __name__=="__main__":
    manager = mp.Manager()
    nodes = manager.dict()
    lock = mp.Lock()
    # make just one node: it will make more
    node0 = Node(nodes, lock)
    node0.start()
    # This doesn't join on all the other nodes created subsequently.
    # But everything seems to work out ok.
    node0.join()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文