多进程或多线程? - 并行化数百万次迭代的简单计算并将结果存储在单个数据结构中
我有一个包含 {string:list} 条目的字典 D,我计算一个函数 f(D[s1],D[s2]) -->漂浮 对于 D 中的一对字符串 (s1,s2)。
此外, 我创建了一个自定义矩阵类 LabeledNumericMatrix,它允许我执行 m[ ID1, ID2 ] = 1.0 等赋值。
我需要计算字符串 S 集中所有二元组的 f(x,y) 并将结果存储在 m[x,y] 中,包括当 s1=s2 时。 这很容易编写为循环,但随着集合 S 的大小增长到较大值(例如 10,000 或更大),执行此代码需要相当长的时间。
我存储在标记矩阵 m 中的结果都不相互依赖。 因此,使用 python 的多线程或多进程服务来并行计算似乎很简单。 然而,由于cPython并没有真正允许我通过线程同时执行f(x,y)的计算和m[x,y]的存储,看来多进程是我唯一的选择。 但是,我不认为多进程旨在在进程之间传递 1GB 数据结构,例如我的标记矩阵结构包含 10000x10000 个元素。
任何人都可以提供以下建议:(a)我是否应该避免尝试并行化我的算法,以及(b)如果我可以进行并行化,如何做到这一点,最好是在 cPython 中?
I have a dictionary D of {string:list} entries, and I compute a function
f( D[s1],D[s2] ) --> float
for a pair of strings (s1,s2) in D.
Additionally,
I have created a custom matrix class LabeledNumericMatrix that allows me to perform assignments such as m[ ID1, ID2 ] = 1.0 .
I need to calculate f(x,y) and store the result in m[x,y] for all 2-tuples in the set of strings S, including when s1=s2.
This is easy to code as a loop, but execution of this code takes quite some time as the size of the set S grows to large values such as 10,000 or more.
None of the results I store in my labeled matrix m depend on each other.
Therefore, it seems straightforward to parallelize this computation by using python's multithread or multiprocess services.
However, since cPython doesn't truly allow my to simultaneously execute calculation of f(x,y) and storage of m[x,y] through threading, it seems that multiprocess is my only choice.
However, I don't think multiprocess is designed to pass around 1GB data structures between processes, such as my labelled matrix structure containing 10000x10000 elements.
Can anyone provide advice of (a) if I should avoid trying to parallelize my algorithm, and (b) if I can do the parallelization, how to do such, preferably in cPython?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
第一个选项 - 服务器进程
创建一个服务器进程。它是多处理包的一部分,允许并行访问数据结构。这样每个进程都会直接访问数据结构,锁定其他进程。
来自文档:
第二个选项 - 工作人员池
创建一个 工作人员池工作人员,一个输入队列和一个结果队列。
第三种选择 - 划分为独立问题
您的数据是独立的: f( D[si],D[sj] ) 是一个隐蔽的问题,独立于任何 f( D[sk],D[sl] ) 。此外,每对的计算时间应该相当相等,或者至少处于相同的数量级。
将任务划分为 n 个输入集,其中 n 是您拥有的计算单元(核心,甚至计算机)的数量。将每个输入集分配给不同的进程,然后连接输出。
First option - a Server Process
Create a Server process. It's part of the Multiprocessing package which allows parallel access to data structures. This way every process will access the data structure directly, locking other processes.
From the documentation:
Second option - Pool of workers
Create a Pool of workers, an input Queue and a result Queue.
Third option - divide to independent problems
Your data is independent: f( D[si],D[sj] ) is a secluded problem, independent of any f( D[sk],D[sl] ) . furthermore, the computation time of each pair should be fairly equal, or at least in the same order of magnitude.
Divide the task into n inputs sets, where n is the number of computation units (cores, or even computers) you have. Give each input set to a different process, and join the output.
线程
绝对不会提高性能 - 它对于 CPU 密集型任务来说是不合适的工具。所以唯一可能的选择是
多处理
,但由于你有一个大数据结构,我建议类似mmap
(相当低级别,但内置)或Redis
(美味且高级 API,但应安装和配置)。You definitely won't get any performance increase with
threading
- it is an inappropriate tool for cpu-bound tasks.So the only possible choice is
multiprocessing
, but since you have a big data structure, I'd suggest something likemmap
(pretty low level, but builtin) orRedis
(tasty and high level API, but should be installed and configured).你分析过你的代码吗?是仅仅计算 f 成本太高,还是将结果存储在数据结构中(或者两者都存在)?
如果 f 占主导地位,那么在开始担心并行化之前,您应该确保无法进行算法改进。通过将部分或全部函数转换为 C 扩展,也许可以使用 cython 。如果您确实使用多处理,那么我不明白为什么您需要在进程之间传递整个数据结构?
如果将结果存储在矩阵中成本太高,您可以通过使用更高效的数据结构(例如 array.array 或 numpy.ndarray)。除非您非常仔细地设计和实现自定义矩阵类,否则它几乎肯定会比那些慢。
Have you profiled your code? Is it just calculating f that is too expensive, or storing the results in the data structure (or maybe both)?
If f is dominant, then you should make sure you can't make algorithmic improvements before you start worrying about parallelization. You might be able to get a big speed up by turning some or all of the function into a C extension, perhaps using cython. If you do go with multiprocessing, then I don't see why you need to pass the entire data structure between processes?
If storing results in the matrix is too expensive, you might speed up your code by using a more efficient data structure (like array.array or numpy.ndarray). Unless you have been very careful designing and implementing your custom matrix class, it will almost certainly be slower than those.
谢谢大家的回复。
我已经为所提出的问题创建了一个解决方案(不是“解决方案”),并且由于其他人可能会发现它有用,因此我在此处发布代码。我的解决方案是 Adam Matan 建议的选项 1 和 3 的混合。该代码包含我的 vi 会话中的行号,这将有助于下面的讨论。
第 36-47 行只是与问题定义相关的初步内容,问题定义是原始问题的一部分。
绕过 cPython 的 GIL 的多处理设置位于第 49-56 行,第 57-70 行用于均匀地创建细分任务。使用第 57-70 行代码代替 itertools.product,因为当行/列 ID 列表达到 40,000 左右时,产品最终会占用大量内存。
实际要执行的计算在第74-78行,这里利用了ID→向量条目的共享字典和共享结果队列。
第 81-85 行设置了实际的 Process 对象,尽管它们实际上尚未启动。
在我的第一次尝试中(此处未显示),“try ... resultQueue.get() and allocate except ...”代码实际上位于外部控制循环之外(虽然并非所有计算都完成)。当我在 9x9 矩阵的单元测试上运行该版本的代码时,没有出现任何问题。
然而,当放大到 200x200 或更大时,我发现这段代码挂起,尽管在执行之间没有更改代码中的任何内容。
根据此讨论(http://bugs.python.org/issue8426)和多进程的官方文档,如果底层实现没有非常大的管道/套接字大小,则 multiprocess.Queue 的使用可能会挂起。因此,这里给出的代码作为我的解决方案定期清空队列,同时检查进程的完成情况(参见第 91-106 行),以便子进程可以继续将新结果放入其中并避免管道过满。
当我在 1000x1000 的较大矩阵上测试代码时,我注意到计算代码远远领先于队列和矩阵分配代码。使用 cProfile,我发现一个瓶颈是默认轮询间隔 processCheckTime=1.0(第 23 行),降低该值可以提高结果速度(有关计时示例,请参阅帖子底部)。对于刚接触 Python 多重处理的其他人来说,这可能是有用的信息。
总的来说,这可能不是最好的实现,但它确实为进一步优化提供了一个起点。正如人们常说的,通过并行化进行优化需要正确的分析和思考。
时序示例,全部使用 8 个 CPU。
500x500(125250次 -粘贴代码,这是我使用的单元测试发展的一部分。显然,标记的矩阵类代码不在这里,并且不包括指纹读取器/记分器代码(尽管创建自己的代码非常简单)。当然,如果对某人有帮助,我也很乐意分享该代码。
Thank you everyone for your responses.
I have created a solution (not "the" solution) to the proposed problem, and since others may find it useful, I am posting the code here. My solution is a hybrid of options 1 and 3 suggested by Adam Matan. The code contains line numbers from my vi session, which will help in the discussion below.
Lines 36-47 are simply preliminary stuff related to the problem definition that was a part of the original question.
The setup for the multiprocessing to get around cPython's GIL is in lines 49-56, with lines 57-70 used to evenly create the subdivided tasks. Code in lines 57-70 is used instead of itertools.product, because when the list of row/column IDs reaches 40,000 or so, the product ends up taking an enormous amount of memory.
The actual computation to be performed is in lines 74-78, and here the shared dictionary of ID->vector entries and shared result queue are utilized.
Lines 81-85 setup the actual Process objects, though they haven't actually been started yet.
In my first attempt (not shown here), the "try ... resultQueue.get() and assign except ..." code was actually located outside of the outer control loop (while not all calculations finished). When I ran that version of the code on a unit test of a 9x9 matrix, there were no problems.
However, moving up to 200x200 or larger, I found this code to hang, despite not changing anything in the code between executions.
According to this discussion (http://bugs.python.org/issue8426) and the official documentation for multiprocess, the use of multiprocess.Queue can hang if the underlying implementation doesn't have a very large pipe/socket size. Therefore, the code given here as my solution periodically empties the queue while checking on completion of processes (see lines 91-106) so that the child processes can continue to put new results in it and avoid the pipe being overfull.
When I tested the code on larger matrices of 1000x1000, I noticed that the computation code finished well ahead of the Queue and matrix assignment code. Using cProfile, I found that one bottleneck was the default polling interval processCheckTime=1.0 (line 23), and lowering this value improved the speed of results (see bottom of post for timing examples). This might be useful information for other people new to multiprocessing in Python.
Overall, this probably is not be the best implementation possible, but it does provide a starting point for further optimization. As is often said, optimization via parallelization requires proper analysis and thought.
Timing examples, all with 8 CPUs.
In case anyone wants to copy-and-paste the code, here is a unit-test I used for part of the development. Obviously, the labelled matrix class code is not here, and the fingerprint reader/scorer code is not included (though it is pretty simple to roll your own). Of course, I'm happy to share that code as well if would help someone.
参考我在 3 月 21 日发布的代码中附加的最后一条评论,我发现 multiprocessing.Pool + SQLite (pysqlite2) 无法用于我的特定任务,因为出现了两个问题:
(1) 使用默认连接,第一个工作程序除外,执行插入查询的所有其他工作进程仅执行一次。
(2) 当我将连接关键字更改为 check_same_thread=False 时,将使用完整的工作池,但只有一些查询成功,一些查询失败。当每个worker也执行time.sleep(0.01)时,查询失败的数量减少了,但并没有完全减少。
(3) 不太重要的是,我可以听到我的硬盘疯狂地读/写,即使对于 10 个插入查询的小作业列表也是如此。
接下来,我求助于 MySQL-Python,效果好多了。确实,必须设置 MySQL 服务器守护程序、用户以及该用户的数据库,但这些步骤相对简单。
这是对我有用的示例代码。显然它可以优化,但它为那些正在寻找如何开始使用多处理的人传达了基本思想。
In reference to my last comment attached to the code posted on March 21, I found multiprocessing.Pool + SQLite (pysqlite2) unusable for my particular task, as two problems occurred:
(1) Using the default connection, except for the first worker, every other worker process that performed an insert query only executed once.
(2) When I change the connection keywords to check_same_thread=False, then the full pool of workers is used, but then only some queries succeed and some queries fail. When each worker also executed time.sleep(0.01), the number of query failures was reduced, but not entirely.
(3) Less importantly, I could hear my hard disk reading/writing frantically, even for a small job list of 10 insert queries.
I next resorted to MySQL-Python, and things worked out much better. True, one must setup the MySQL server daemon, a user, and a database for that user, but those steps are relatively simple.
Here is sample code that worked for me. Obviously it could be optimized, but it conveys the basic idea for those who are looking for how to get starting using multiprocessing.