Python 2.6:使用 multiprocessing.Pool 时处理本地存储

发布于 2024-10-24 03:17:21 字数 584 浏览 6 评论 0原文

我正在尝试构建一个 python 脚本,该脚本具有跨大量数据的工作进程池(使用 mutiprocessing.Pool)。

我希望每个进程都有一个唯一的对象,可以在该进程的多次执行中使用。

伪代码:

def work(data):
    #connection should be unique per process
    connection.put(data)
    print 'work done with connection:', connection

if __name__ == '__main__':
    pPool = Pool() # pool of 4 processes
    datas = [1..1000]
    for process in pPool:
        #this is the part i'm asking about // how do I really do this?
        process.connection = Connection(conargs)
    for data in datas:
       pPool.apply_async(work, (data))

I'm attempting to build a python script that has a pool of worker processes (using mutiprocessing.Pool) across a large set of data.

I want each process to have a unique object that gets used across multiple executes of that process.

Psudo code:

def work(data):
    #connection should be unique per process
    connection.put(data)
    print 'work done with connection:', connection

if __name__ == '__main__':
    pPool = Pool() # pool of 4 processes
    datas = [1..1000]
    for process in pPool:
        #this is the part i'm asking about // how do I really do this?
        process.connection = Connection(conargs)
    for data in datas:
       pPool.apply_async(work, (data))

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

败给现实 2024-10-31 03:17:21

我认为类似的东西应该有效(未经测试)

def init(*args):
    global connection
    connection = Connection(*args)
pPool = Pool(initializer=init, initargs=conargs) 

I think something like that should work (not tested)

def init(*args):
    global connection
    connection = Connection(*args)
pPool = Pool(initializer=init, initargs=conargs) 
时光沙漏 2024-10-31 03:17:21

直接创建 mp.Processes 可能是最简单的(没有 mp.Pool):

import multiprocessing as mp
import time

class Connection(object):
    def __init__(self,name):
        self.name=name
    def __str__(self):
        return self.name

def work(inqueue,conn):
    name=mp.current_process().name
    while 1:
        data=inqueue.get()
        time.sleep(.5)
        print('{n}: work done with connection {c} on data {d}'.format(
            n=name,c=conn,d=data))
        inqueue.task_done()

if __name__ == '__main__':
    N=4
    procs=[]
    inqueue=mp.JoinableQueue()
    for i in range(N):
        conn=Connection(name='Conn-'+str(i))
        proc=mp.Process(target=work,name='Proc-'+str(i),args=(inqueue,conn))
        proc.daemon=True
        proc.start()

    datas = range(1,11)
    for data in datas:
        inqueue.put(data)
    inqueue.join()

产量

Proc-0: work done with connection Conn-0 on data 1
Proc-1: work done with connection Conn-1 on data 2
Proc-3: work done with connection Conn-3 on data 3
Proc-2: work done with connection Conn-2 on data 4
Proc-0: work done with connection Conn-0 on data 5
Proc-1: work done with connection Conn-1 on data 6
Proc-3: work done with connection Conn-3 on data 7
Proc-2: work done with connection Conn-2 on data 8
Proc-0: work done with connection Conn-0 on data 9
Proc-1: work done with connection Conn-1 on data 10

注意 Proc 数字对应于相同的 < code>Conn 每次编号。

It may be easiest to create the mp.Processes directly (without mp.Pool):

import multiprocessing as mp
import time

class Connection(object):
    def __init__(self,name):
        self.name=name
    def __str__(self):
        return self.name

def work(inqueue,conn):
    name=mp.current_process().name
    while 1:
        data=inqueue.get()
        time.sleep(.5)
        print('{n}: work done with connection {c} on data {d}'.format(
            n=name,c=conn,d=data))
        inqueue.task_done()

if __name__ == '__main__':
    N=4
    procs=[]
    inqueue=mp.JoinableQueue()
    for i in range(N):
        conn=Connection(name='Conn-'+str(i))
        proc=mp.Process(target=work,name='Proc-'+str(i),args=(inqueue,conn))
        proc.daemon=True
        proc.start()

    datas = range(1,11)
    for data in datas:
        inqueue.put(data)
    inqueue.join()

yields

Proc-0: work done with connection Conn-0 on data 1
Proc-1: work done with connection Conn-1 on data 2
Proc-3: work done with connection Conn-3 on data 3
Proc-2: work done with connection Conn-2 on data 4
Proc-0: work done with connection Conn-0 on data 5
Proc-1: work done with connection Conn-1 on data 6
Proc-3: work done with connection Conn-3 on data 7
Proc-2: work done with connection Conn-2 on data 8
Proc-0: work done with connection Conn-0 on data 9
Proc-1: work done with connection Conn-1 on data 10

Notice the Proc numbers correspond to the same Conn number each time.

眼睛会笑 2024-10-31 03:17:21

进程本地存储作为映射容器很容易实现,对于任何从 Google 到这里寻找类似东西的人来说(注意这是 Py3,但很容易转换为 2 的语法(只需从 object 继承):

class ProcessLocal:
    """
    Provides a basic per-process mapping container that wipes itself if the current PID changed since the last get/set.
    Aka `threading.local()`, but for processes instead of threads.
    """

    __pid__ = -1

    def __init__(self, mapping_factory=dict):
        self.__mapping_factory = mapping_factory

    def __handle_pid(self):
        new_pid = os.getpid()
        if self.__pid__ != new_pid:
            self.__pid__, self.__store = new_pid, self.__mapping_factory()

    def __delitem__(self, key):
        self.__handle_pid()
        return self.__store.__delitem__(key)

    def __getitem__(self, key):
        self.__handle_pid()
        return self.__store.__getitem__(key)

    def __setitem__(self, key, val):
        self.__handle_pid()
        return self.__store.__setitem__(key)

查看更多@ https://github.com/akatrevorjay/pytutils/blob /develop/pytutils/mappings.py

Process local storage is pretty easy to implement as a mapping container, for anyone else getting here from Google looking for something similar (note this is Py3, but easily convertible to 2's syntax (just inherit from object):

class ProcessLocal:
    """
    Provides a basic per-process mapping container that wipes itself if the current PID changed since the last get/set.
    Aka `threading.local()`, but for processes instead of threads.
    """

    __pid__ = -1

    def __init__(self, mapping_factory=dict):
        self.__mapping_factory = mapping_factory

    def __handle_pid(self):
        new_pid = os.getpid()
        if self.__pid__ != new_pid:
            self.__pid__, self.__store = new_pid, self.__mapping_factory()

    def __delitem__(self, key):
        self.__handle_pid()
        return self.__store.__delitem__(key)

    def __getitem__(self, key):
        self.__handle_pid()
        return self.__store.__getitem__(key)

    def __setitem__(self, key, val):
        self.__handle_pid()
        return self.__store.__setitem__(key)

See more @ https://github.com/akatrevorjay/pytutils/blob/develop/pytutils/mappings.py

趴在窗边数星星i 2024-10-31 03:17:21

您希望有一个对象驻留在共享内存中,对吗?

Python 在其标准库中对此有一些支持,但它有点差。据我记得,只能存储整数和其他一些基本类型。

尝试 POSH(Python 对象共享):http://poshmodule.sourceforge.net/

You want to have an object residing in shared memory, right?

Python has some support for that in its standard library, but it's kinda poor. As far as I recall, only Integers and some other primitive types can be stored.

Try POSH (Python Object Sharing): http://poshmodule.sourceforge.net/

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文