CPU 密集型任务 - 多处理方法性能比同步方法差 - 为什么?
我只是从异步编程开始,我有一个有关CPU绑定任务的问题,并进行了多处理。简而言之,为什么多处理比同步方法产生的时间性能差?在异步版本中,我的代码是否做错了?欢迎任何建议!
1:任务说明
我想使用 Google的Ngram数据集作为输入,创建一个巨大的字典包括每个单词和相应的单词计数。
如下:
corpus \ tyear \ tword_count \ t \ number_of_of_book_corpus_showup”
示例:
数据集中的每个记录看起来都
“ Intel Core i5-5300U CPU @ 2.30 GHz 8GB RAM
3:同步版 - 时间170.6280147 SEC
import time
with open(r".\googlebooks-eng-all-1gram-20120701-a.gz",encoding='utf-8') as file:
start = time.perf_counter()
all_input = file.readlines()
word_count_dict = {}
for line in all_input:
temp = line.replace('\n','').split('\t')
if temp[0] not in word_count_dict.keys():
word_count_dict[temp[0]] = temp[2]
else:
word_count_dict[temp[0]] += temp[2]
print(f'total time used for sync version as {time.perf_counter() - start}')
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
def data_spliting(input_data,chunk_size): # todo see next part for how to set chunk size
for x in range(0,len(input_data),chunk_size):
yield input_data[x:x+chunk_size]
def single_chunk_dict(chunk):
result = {}
for line in chunk:
temp = line.replace('\n','').split('\t')
if temp[0] not in result.keys():
result[temp[0]] = temp[2]
else:
result[temp[0]] += temp[2]
return result
def word_reduce(first_dict,second_dict):
result = {}
for map in [first_dict,second_dict]:
for key, value in map.items():
if key not in result.keys():
result[key] = value
else:
result[key] += value
return result
async def main():
with open(r".\googlebooks-eng-all-1gram-20120701-a.gz",encoding='utf-8') as file:
test = file.readlines()
with ProcessPoolExecutor() as process_pool:
loop = asyncio.get_running_loop()
tasks = [functools.partial(single_chunk_dict,ch) for ch in data_spliting(test,21654626)]
result = [loop.run_in_executor(process_pool,x) for x in tasks]
result = await asyncio.gather(*result)
output = functools.reduce(lambda x,y: word_reduce(x,y),result)
print(f'output total keys = {len(output.keys())}')
if __name__ == '__main__':
start = time.perf_counter()
asyncio.run(main())
print(f'Total Time for Completion as {time.perf_counter() - start}')
关于彼得斯先生的回答的进一步问题。
我不敢相信我的问题得到了蒂姆·彼得斯的回答。这太酷了!
1:在我正在阅读的书中,作者使用此任务演示MapReduce。我想知道MapReduce是否是多处理的好候选人。
2:在书中,作者建议使用Asyncio事件循环挂钩ProcessPoolexecutor,让我们使用API函数,例如gather()和as_complete()。将ProcessPoolExecutor与Asyncio混合是一个好习惯吗?还是我应该在ProcessPoolExecutor中坚持MAP()函数?
3:“理想的”候选者的粗粒平行性可以进行大量计算,每个字节需要在过程之间传输,并且根本不需要太多的过程间通信。”
“在过程之间转移”和“过程间交流”是什么意思?
I just get start with asynchronous programming, and I have one questions regarding CPU bound task with multiprocessing. In short, why multiprocessing generated way worse time performance than Synchronous approach? Did I do anything wrong with my code in asynchronous version? Any suggestions are welcome!
1: Task description
I want use one of the Google's Ngram datasets as input, and create a huge dictionary includes each words and corresponding words count.
Each Record in the dataset looks like follow :
"corpus\tyear\tWord_Count\t\Number_of_Book_Corpus_Showup"
Example:
"A'Aang_NOUN\t1879\t45\t5\n"
2: Hardware Information:
Intel Core i5-5300U CPU @ 2.30 GHz 8GB RAM
3: Synchronous Version - Time Spent 170.6280147 sec
import time
with open(r".\googlebooks-eng-all-1gram-20120701-a.gz",encoding='utf-8') as file:
start = time.perf_counter()
all_input = file.readlines()
word_count_dict = {}
for line in all_input:
temp = line.replace('\n','').split('\t')
if temp[0] not in word_count_dict.keys():
word_count_dict[temp[0]] = temp[2]
else:
word_count_dict[temp[0]] += temp[2]
print(f'total time used for sync version as {time.perf_counter() - start}')
4: Asynchronous Version - Time Spent 611.5669237 sec
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time
def data_spliting(input_data,chunk_size): # todo see next part for how to set chunk size
for x in range(0,len(input_data),chunk_size):
yield input_data[x:x+chunk_size]
def single_chunk_dict(chunk):
result = {}
for line in chunk:
temp = line.replace('\n','').split('\t')
if temp[0] not in result.keys():
result[temp[0]] = temp[2]
else:
result[temp[0]] += temp[2]
return result
def word_reduce(first_dict,second_dict):
result = {}
for map in [first_dict,second_dict]:
for key, value in map.items():
if key not in result.keys():
result[key] = value
else:
result[key] += value
return result
async def main():
with open(r".\googlebooks-eng-all-1gram-20120701-a.gz",encoding='utf-8') as file:
test = file.readlines()
with ProcessPoolExecutor() as process_pool:
loop = asyncio.get_running_loop()
tasks = [functools.partial(single_chunk_dict,ch) for ch in data_spliting(test,21654626)]
result = [loop.run_in_executor(process_pool,x) for x in tasks]
result = await asyncio.gather(*result)
output = functools.reduce(lambda x,y: word_reduce(x,y),result)
print(f'output total keys = {len(output.keys())}')
if __name__ == '__main__':
start = time.perf_counter()
asyncio.run(main())
print(f'Total Time for Completion as {time.perf_counter() - start}')
Further Questions Regarding Mr. Peters' Answer.
I cannot believe my question get answered by Tim Peters. This is so cool!
1: In the book that I am reading, the author uses this task to demonstrate MapReduce. I am wondering if MapReduce is a good candidate for multiprocessing.
2: In the book, the author suggests hooking ProcessPoolExecutor with asyncio event loop let us use the API function such as gather() and as_complete(). Is mixing ProcessPoolExecutor with asyncio a good practice? Or should I stick with the map() function in ProcessPoolExecutor?
3: "An "ideal" candidate for coarse-grained parallelism does a whole lot of computation per byte that needs to be transferred between processes, and not need much inter-process communication at all."
What do "transferred between processes" and "inter-process communication" mean?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
在您的代码中,我不了解很多。因此,我将为您提供有效的代码;-)
我对您的代码的运行方式感到困惑。
.gz
文件是压缩二进制数据(GZIP压缩)。您应该需要使用Python的gzip.open()
打开它。正如我期望的那样,我希望它会像我尝试时一样死亡。。
temp [2]
不是整数。这是一个字符串。您不是在此处添加整数,而是用+
来表达字符串。int()
需要先应用。我不相信我曾经看过
asyncio
与confurrent.futures
混合。没有必要。asyncio
针对单个线程中的细粒度伪算;并发。Futures
针对跨流程的粗粒纯正并发。您想要后者。没有asyncio
。,代码更容易,更简单,更快。
condurrent.futures
很好,我已经足够大,以至于我投入了很多东西来学习较旧的多处理
,所以我在这里使用它。这些ngram文件足够大,无论运行串行版本还是并行版本,我都在“块”读取。
collections.counter
比普通的dict更适合您的任务。当我在机器上的速度比您更快时,上面提到的一些更改与我的时间更快有关。
我确实使用3个工作流程得到了加速,但是实际上,几乎所有3个工作都没有被利用。输入的每条线几乎没有完成计算,我希望它比CPU结合更多。所有的过程也在争夺缓存空间,缓存错过很昂贵。粗粒平行性的“理想”候选者可以在过程之间传递大量的每个字节,并且根本不需要太多的过程间通信。这个问题都不是正确的。
并从一项运行中输出:
编辑:使用
conturrent.futures
相反,在这里使用
conturrent.futures
在这里而不是多处理
尽管MP
有很多铃铛&吹口哨可能需要一段时间才能意识到这一点。时机也不重要:在封面下,时间级别的过程间管或插座会压倒性地消耗时间。您使用哪种高级API来达到这一点并不重要。只需替换mp
部分,就像这样:确实,代码本质上是相同的。
Q&一个
“ mapReduce”是一种具有几种含义的艺术术语,主要是指安排平行计算的一种思考方式,尤其是 apache hadoop 该模型的实现。标准(python.org)Python分布不直接支持它。
在我向您展示的代码中,“映射”部分由其名称中的``映射''拼写(
多处理
'simap_unordored()
和并发.future
'smap()
)。 “降低”部分被简单地拼写为“+=” -counter
直接支持以这种方式组合其中的两个,这比任何间接做的方式都更为明显,更有效它。如果您想认真对待MapReduce,建议您下载专门针对该框架的精心设计的软件包。在极端(例如Hadoop),他们将要实现自己的文件系统。
这里也不需要
asyncio
的“ as as as as tote”部分。pool
'simap_unordered()
(我使用的代码使用)直接完成了这些工作,在完成后返回结果。虽然我没有在这里显示它,但conturrent.futures
提供了as_completed()
函数,其作用大致相同。同样,由于包装直接实施这些包装,它们比任何间接的方法都更为明显,效率。我的目标是阻止您使用
asyncio
,除非确实需要(并且不是在这里)。可以肯定的是,绝大多数Python程序员都不知道asyncio
做什么,因此无法使用它遵循,调试或扩展代码。多处理
和costurrent.futures
对IPC(过程间通信)的挑战性足够多,您的操作系统可以保证两个进程具有不同的地址空间。 没有什么之间共享。与其他任何过程一样,Python过程都是如此。例如,例如,您在多进程Python程序中具有一个字符串对象,则仅一个过程可见。如果您想查看另一个过程,则需要IPC将对象“将”对象“发送”到另一个过程。那远非免费。在封面下,字符串通过
腌制
模块变成一个字节序列,将字节序列推入OS级插座或管道间程序间连接,并且接收过程必须采取该序列并取消分配以重建原始字符串对象的副本。在此问题中,一个巨大的瓶颈是输入数据文件。它是压缩的二进制数据,因此无法将不同的过程偏移到文件中,以便他们可以在自己的文章上 start 。取而代之的是,一个进程必须对其进行解压缩,并使用IPC将(再次是!)行发送到其他过程。解开拉链后,这是在2千兆字节的原始数据上完成的,以单独发送跨过程,它们一次通过多层软件(腌制,管道或插座的两端,不打印)咀嚼一个字节上。封面下有很多机器可以支持这一点,它们都无法免费使用,包括使用辅助线程和过程间锁的方法,以确保所有数据保持理智,并且不会“冻结”发送或接收过程。
什么是“理想”应用?例如,您有要考虑(素数)的一百万个大整数列表。 200个数字的整数仅为200个字节。发送跨流程不是免费的,但是与对其进行分解的年份相比,成本是微不足道的;-)如果您想在每个整数中添加1个,则跨流程发送整数的成本是 高于添加1的成本。
在手头的情况下,所做的实际工作包括将一条短线分解为少数碎片(在选项卡上分开),将代表小整数的字符串转换为int ,然后将该INT添加到由另一个字符串的索引索引中的运行总数中。不是免费的,但也不昂贵。除非对输入文件进行重新设计,否则它并不是多处理的有前途的候选人>根本没有结合其命令的结合)。
There's quite a bit I don't understand in your code. So instead I'll just give you code that works ;-)
I'm baffled by how your code can run at all. A
.gz
file is compressed binary data (gzip compression). You should need to open it with Python'sgzip.open()
. As is, I expect it to die with an encoding exception, as it does when I try it.temp[2]
is not an integer. It's a string. You're not adding integers here, you're catenating strings with+
.int()
needs to be applied first.I don't believe I've ever seen
asyncio
mixed withconcurrent.futures
before. There's no need for it.asyncio
is aimed at fine-grained pseudo-concurrency in a single thread;concurrent.futures
is aimed at coarse-grained genuine concurrency across processes. You want the latter here. The code is easier, simpler, and faster withoutasyncio
.While
concurrent.futures
is fine, I'm old enough that I invested a whole lot into learning the oldermultiprocessing
first, and so I'm using that here.These ngram files are big enough that I'm "chunking" the reads regardless of whether running the serial or parallel version.
collections.Counter
is much better suited to your task than a plain dict.While I'm on a faster machine than you, some of the changes alluded to above have a lot do with my faster times.
I do get a speedup using 3 worker processes, but, really, all 3 were hardly ever being utilized. There's very little computation being done per line of input, and I expect that it's more memory-bound than CPU-bound. All the processes are fighting for cache space too, and cache misses are expensive. An "ideal" candidate for coarse-grained parallelism does a whole lot of computation per byte that needs to be transferred between processes, and not need much inter-process communication at all. Neither are true of this problem.
and output from one run:
EDIT: Using
concurrent.futures
insteadIt's really neither easier nor harder to use
concurrent.futures
here instead ofmultiprocessing
, althoughmp
has so very many bells & whistles it may take a while to realize that. The timing doesn't really matter either: under the covers, the time is overwhelmingly consumed by OS-level inter-process pipes or sockets. Which higher-level API you use to get at that doesn't much matter to speed. Just replace themp
part like so:Indeed, the code is essentially the same.
Q&A
"MapReduce" is a term of art with several meanings, referring mostly to a way of thinking about arranging parallel computations, and especially to the Apache Hadoop implementation of that model. The standard (python.org) Python distribution does not support it directly as such.
In the code I showed you, the "map" part is spelled by functions with "map" in their name (
multiprocessing
'simap_unordered()
, andconcurrent.future
'smap()
). The "reduce" part was spelled simply "+=" -Counter
s directly support that way to combine two of them, which is (or should be) more obvious and more efficient than any indirect way of doing it.If you want to pursue MapReduce seriously, I suggest downloading elaborate software packages specifically aiming at that framework. At the extreme (like Hadoop), they're going to want to implement their own filesystem.
The "as complete" part from
asyncio
isn't really needed here either.Pool
'simap_unordered()
(which my code used) directly accomplishes that, returning results as they're completed. While I didn't show it here,concurrent.futures
offers anas_completed()
function that does much the same. Again since the packages implement those directly, they're more obvious and efficient than any indirect way of doing it.I do aim to discourage you from using
asyncio
unless it's truly needed (and it isn't, here). It's a safe bet that a large majority of Python programmers have no idea whatasyncio
does, and so can't follow, debug, or extend code using it.multiprocessing
andconcurrent.futures
are challenging enough on their own ;-)About IPC (inter-process communication), your OS guarantees that two processes have different address spaces. Nothing is shared between them. That's as true of Python processes as of any others. If, e.g., you have a string object in a multi-process Python program, it's visible to only one process. If you want another process to see it, you need IPC to "send" the object to that other process. That's far from free. Under the covers, the string is turned into a sequence of bytes by the
pickle
module, that sequence of bytes is pushed into an OS-level socket or pipe inter-process connection, and the receiving process has to take that sequence and unpickle it to reconstruct a copy of the original string object.In this problem, a huge bottleneck is the input data file. It's compressed binary data, and so there's no way to tell different processes offsets into the file so they can all start on their own piece. Instead one process has to decompress it and use IPC to send (again, physical copies of!) lines to other processes. After unzipping, that's closing on 2 gigabytes of raw data all on its own to send across processees, which they all chew over one byte at a time, through multiple layers of software (pickling, both ends of a pipe or socket, unpickling). There's a lot of machinery under the covers to support this, none of which works for free, including too ways of using helper threads and inter-process locks to ensure all the data stays sane and doesn't "freeze" the sending or receiving processes.
What's an "ideal" application? For example, you have a list of a million large integers you want to factor (into primes). A 200-digit integer is only about 200 bytes long. Sending that across processes isn't free, but the cost is trivial compared to the years it may take to factor it ;-) If instead you wanted to add 1 to each integer, the cost of sending an integer across processes is far higher than the cost of adding 1.
In the case at hand, the actual work done consists of breaking a short line into a handful of pieces (splitting on tabs), converting a string representing a small integer into an int, then adding that int to a running total in a dict indexed by another piece of the string. Not free, but not expensive either. It's not really a promising candidate for multiprocessing unless the input file is reworked so that multiple processes can read from it independently (for example, if the data was stored to begin with as 100 different files - then processes wouldn't need to communicate at all except to combine their dicts at the end).