在子进程启动后授予对共享内存的访问权限

发布于 2024-12-04 08:14:20 字数 1527 浏览 1 评论 0原文

如果数据仅在子进程生成后才可用(使用 multiprocessing.Process)?

我知道 multiprocessing.sharedctypes.RawArray,但我无法弄清楚如何让我的子进程访问进程启动后创建的 RawArray

数据由父进程生成,数据量事先未知。

如果不是 GIL 我会使用线程来代替,这将使这个任务变得更简单。使用非 CPython 实现不是一种选择。


深入了解 muliprocessing.sharedctypes,看起来共享ctype对象被分配使用 mmaped内存

归结为:如果在子进程生成后父进程调用 mmap(),子进程可以访问匿名映射的内存吗?

所以这个问题实际上可以 这个问题中提出的问题的脉络,除了我的情况mmap() 的调用者是父进程,而不是子进程。


(已解决)

我创建了自己的 RawArray 版本,它在底层使用了 shm_open() 。只要标识符 (tag) 匹配,生成的共享 ctypes 数组就可以与任何进程共享。

请参阅此答案 了解详细信息和示例。

How do I give child processes access to data in shared memory if the data is only available after the child processes have been spawned (using multiprocessing.Process)?

I am aware of multiprocessing.sharedctypes.RawArray, but I can't figure out how to give my child processes access to a RawArray that is created after the processes have already started.

The data is generated by the parent process, and the amount of data is not known in advance.

If not for the GIL I'd be using threading instead which will make this task a little simpler. Using a non-CPython implementation is not an option.


Looking under the hood of muliprocessing.sharedctypes, it looks like shared ctype objects are allocated using mmaped memory.

So this question really boils down to: Can a child process access an anonymously mapped memory if mmap() was called by the parent after the child process was spawned?

That's somewhat in the vein of what's being asked in this question, except that in my case the caller of mmap() is the parent process and not the child process.


(Solved)

I created my own version of RawArray that uses shm_open() under the hood. The resulting shared ctypes array can be shared with any process as long as the identifier (tag) matches.

See this answer for details and an example.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

情话已封尘 2024-12-11 08:14:20

免责声明:我是问题的作者。

我最终使用了 posix_ipc模块来创建我自己的 RawArray 版本。我主要使用 posix_ipc.SharedMemory 调用 shm_open()在引擎盖下。

我的实现 (ShmemRawArray) 公开了与 RawArray 相同的功能,但需要两个附加参数 - 一个用于唯一标识共享内存区域的 tag 和一个create 标志来确定我们是否应该创建一个新的共享内存段或附加到现有的共享内存段。

如果有人感兴趣,这里有一份副本: https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)

使用说明:

  • 第一个两个参数(typecode_or_typesize_or_initializer)应与 RawArray 的工作方式相同。
  • 只要 tag 匹配,任何进程都可以访问共享数组。
  • 删除原始对象(由 ShmemRawArray(..., create=True) 返回)时,共享内存段被取消链接
  • 使用当前存在的 tag 创建共享数组将引发 ExistentialError
  • 使用不存在的标签(或已取消链接的标签)访问共享数组也会引发 ExistentialError

一个SSCCE(简短、独立、可编译的示例)展示了它的实际应用。

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()

运行此命令会产生以下输出:

[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']

Disclaimer: I am the author of the question.

I eventually used the posix_ipc module to create my own version of RawArray. I used mainly posix_ipc.SharedMemory which calls shm_open() under the hood.

My implementation (ShmemRawArray) exposes the same functionality as RawArray but required two additional parameters - a tag to uniquely identify the shared memory region, and a create flag to determine if we should be created a new shared memory segment or attach to an existing one.

Here's a copy if anyone's interested: https://gist.github.com/1222327

ShmemRawArray(typecode_or_type, size_or_initializer, tag, create=True)

Usage notes:

  • The first two args (typecode_or_type and size_or_initializer) should work the same as with RawArray.
  • The shared array is accessible by any process, as long as tag matches.
  • The shared memory segment is unlinked when the origin object (returned by ShmemRawArray(..., create=True)) is deleted
  • Creating an shared array using a tag that currently exists will raise an ExistentialError
  • Accessing a shared array using a tag that doesn't exist (or one that has been unlinked) will also raise an ExistentialError

A SSCCE (Short, Self Contained, Compilable Example) showing it in action.

#!/usr/bin/env python2.7
import ctypes
import multiprocessing
from random import random, randint
from shmemctypes import ShmemRawArray

class Point(ctypes.Structure):
    _fields_ = [ ("x", ctypes.c_double), ("y", ctypes.c_double) ]

def worker(q):
    # get access to ctypes array shared by parent
    count, tag = q.get()
    shared_data = ShmemRawArray(Point, count, tag, False)

    proc_name = multiprocessing.current_process().name
    print proc_name, ["%.3f %.3f" % (d.x, d.y) for d in shared_data]

if __name__ == '__main__':
    procs = []
    np = multiprocessing.cpu_count()
    queue = multiprocessing.Queue()

    # spawn child processes
    for i in xrange(np):
        p = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(p)
        p.start()

    # create a unique tag for shmem segment
    tag = "stack-overflow-%d" % multiprocessing.current_process().pid

    # random number of points with random data
    count = randint(3,10) 
    combined_data = [Point(x=random(), y=random()) for i in xrange(count)]

    # create ctypes array in shared memory using ShmemRawArray
    # - we won't be able to use multiprocssing.sharectypes.RawArray here 
    #   because children already spawned
    shared_data = ShmemRawArray(Point, combined_data, tag)

    # give children info needed to access ctypes array
    for p in procs:
        queue.put((count, tag))

    print "Parent", ["%.3f %.3f" % (d.x, d.y) for d in shared_data]
    for p in procs:
        p.join()

Running this results in the following output:

[me@home]$ ./shmem_test.py
Parent ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-1 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-2 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-3 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
Process-4 ['0.633 0.296', '0.559 0.008', '0.814 0.752', '0.842 0.110']
自演自醉 2024-12-11 08:14:20

您的问题听起来非常适合 posix_ipcsysv_ipc模块,它公开共享内存、信号量和消息队列的 POSIX 或 SysV API。那里的功能矩阵包括在他提供的模块中进行选择的极好的建议。

匿名 mmap(2) 区域的问题在于,您无法轻松地与其他进程共享它们——如果它们是文件支持的,那就很容易,但如果您实际上并不需要归档其他任何东西,感觉很愚蠢。如果这是用 C 语言编写的,您可以在 clone(2) 系统调用中使用 CLONE_VM 标志,但我不想尝试使用它带有一个可能对内存安全做出假设的语言解释器。 (即使在 C 语言中,这也会有点危险,因为五年后的维护程序员可能也会对 CLONE_VM 行为感到震惊。)

但是 SysV 和更新的 POSIX 共享内存映射甚至允许不相关的进程通过标识符附加和分离共享内存,因此您所需要做的就是与使用映射的进程共享创建映射的进程的标识符,然后在操作映射中的数据时,它们可用同时对所有进程进行处理,无需任何额外的解析开销。 shm_open(3) 函数返回一个 int,在稍后调用 ftruncate(2)时用作文件描述符mmap(2),因此其他进程可以使用共享内存段,而无需在文件系统中创建文件——并且即使所有使用它的进程都已退出,该内存也将持续存在。 (对于 Unix 来说可能有点奇怪,但它很灵活。)

Your problem sounds like a perfect fit for the posix_ipc or sysv_ipc modules, which expose either the POSIX or SysV APIs for shared memory, semaphores, and message queues. The feature matrix there includes excellent advice for picking amongst the modules he provides.

The problem with anonymous mmap(2) areas is that you cannot easily share them with other processes -- if they were file-backed, it'd be easy, but if you don't actually need the file for anything else, it feels silly. You could use the CLONE_VM flag to the clone(2) system call if this were in C, but I wouldn't want to try using it with a language interpreter that probably makes assumptions about memory safety. (It'd be a little dangerous even in C, as maintenance programmers five years from now might also be shocked by the CLONE_VM behavior.)

But the SysV and newer POSIX shared memory mappings allow even unrelated processes to attach and detach from shared memory by identifier, so all you need to do is share the identifier from the processes that create the mappings with the processes that consume the mappings, and then when you manipulate data within the mappings, they are available to all processes simultaneously without any additional parsing overhead. The shm_open(3) function returns an int that is used as a file descriptor in later calls to ftruncate(2) and then mmap(2), so other processes can use the shared memory segment without a file being created in the filesystem -- and this memory will persist even if all processes using it have exited. (A little strange for Unix, perhaps, but it is flexible.)

美羊羊 2024-12-11 08:14:20

我认为您正在寻找

有关数据序列化的 mmap 模块 这个问题当然如果你希望避免复制我没有答案解决方案

编辑

事实上,您可以使用 CPython 3.2 中的非 stdlib _mutliprocessing 模块来获取 mmap 对象的地址,并将其与 ctypes 对象的 from_address 一起使用
实际上,RawArray 实际上是什么,当然您不应该尝试调整 mmap 对象的大小,因为在这种情况下 mmap 的地址可能会更改,

import mmap
import _multiprocessing
from ctypes import Structure,c_int

map = mmap.mmap(-1,4)
class A(Structure):
    _fields_ = [("x", c_int)]
x = _multiprocessing.address_of_buffer(map)
b=A.from_address(x[0])
b.x = 256

>>> map[0:4]
'\x00\x01\x00\x00'

以在创建子级后暴露内存,您必须使用真实文件映射内存那是在呼唤

map = mmap.mmap(open("hello.txt", "r+b").fileno(),4)

I think you are looking for mmap module

concerning the serializiation of data this question answer of course if you hope to avoid copy I have not the solution

EDIT

in fact you can use the non stdlib _mutliprocessing module in CPython 3.2 to have the address of the mmap object and use it with from_address of a ctypes object
it is what in fact what does RawArray in fact of course you should not try to resize the mmap object as the address of mmap may change in this case

import mmap
import _multiprocessing
from ctypes import Structure,c_int

map = mmap.mmap(-1,4)
class A(Structure):
    _fields_ = [("x", c_int)]
x = _multiprocessing.address_of_buffer(map)
b=A.from_address(x[0])
b.x = 256

>>> map[0:4]
'\x00\x01\x00\x00'

to expose the memory after the child is created you have to map your memory with a real file that is calling

map = mmap.mmap(open("hello.txt", "r+b").fileno(),4)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文