Python,使用多进程比不使用慢

发布于 2024-12-25 18:44:47 字数 1695 浏览 1 评论 0原文

在花了很多时间尝试了解多处理之后,我想出了这个代码,它是一个基准测试:

示例 1:

from multiprocessing  import Process

class Alter(Process):
    def __init__(self, word):
        Process.__init__(self)
        self.word = word
        self.word2 = ''

    def run(self):
        # Alter string + test processing speed
        for i in range(80000):
            self.word2 = self.word2 + self.word

if __name__=='__main__':
    # Send a string to be altered
    thread1 = Alter('foo')
    thread2 = Alter('bar')
    thread1.start()
    thread2.start()

    # wait for both to finish

    thread1.join()
    thread2.join()

    print(thread1.word2)
    print(thread2.word2)

这在 2 秒内完成(多线程时间的一半)。出于好奇,我决定接下来运行这个:

示例 2:

word2 = 'foo'
word3 = 'bar'

word = 'foo'
for i in range(80000):
    word2 = word2 + word

word  = 'bar'
for i in range(80000):
    word3 = word3 + word

print(word2)
print(word3)

令我恐惧的是,这个运行时间不到半秒!

这是怎么回事?我期望多处理运行得更快 - 考虑到示例 1 是示例 2 分成两个进程,它不应该只用示例 2 一半的时间完成吗?

更新:

在考虑克里斯的反馈后,我添加了消耗最多处理时间的“实际”代码,并引导我考虑多重处理:

self.ListVar = [[13379+ strings],[13379+ strings],
                [13379+ strings],[13379+ strings]]

for b in range(len(self.ListVar)):
    self.list1 = []
    self.temp = []
    for n in range(len(self.ListVar[b])):
        if not self.ListVar[b][n] in self.temp:
            self.list1.insert(n, self.ListVar[b][n] + '(' + 
                              str(self.ListVar[b].count(self.ListVar[b][n])) +
                              ')')
           self.temp.insert(0, self.ListVar[b][n])

   self.ListVar[b] = list(self.list1)

After spending a lot of time trying to wrap my head around multiprocessing I came up with this code which is a benchmark test:

Example 1:

from multiprocessing  import Process

class Alter(Process):
    def __init__(self, word):
        Process.__init__(self)
        self.word = word
        self.word2 = ''

    def run(self):
        # Alter string + test processing speed
        for i in range(80000):
            self.word2 = self.word2 + self.word

if __name__=='__main__':
    # Send a string to be altered
    thread1 = Alter('foo')
    thread2 = Alter('bar')
    thread1.start()
    thread2.start()

    # wait for both to finish

    thread1.join()
    thread2.join()

    print(thread1.word2)
    print(thread2.word2)

This completes in 2 seconds (half the time of multithreading). Out of curiosity I decided to run this next:

Example 2:

word2 = 'foo'
word3 = 'bar'

word = 'foo'
for i in range(80000):
    word2 = word2 + word

word  = 'bar'
for i in range(80000):
    word3 = word3 + word

print(word2)
print(word3)

To my horror this ran in less than half a second!

What is going on here? I expected multiprocessing to run faster - shouldn't it complete in half Example 2's time given that Example 1 is Example 2 split into two processes?

Update:

After considering Chris' feedback, I have included the 'actual' code consuming the most process time, and lead me to consider multiprocessing:

self.ListVar = [[13379+ strings],[13379+ strings],
                [13379+ strings],[13379+ strings]]

for b in range(len(self.ListVar)):
    self.list1 = []
    self.temp = []
    for n in range(len(self.ListVar[b])):
        if not self.ListVar[b][n] in self.temp:
            self.list1.insert(n, self.ListVar[b][n] + '(' + 
                              str(self.ListVar[b].count(self.ListVar[b][n])) +
                              ')')
           self.temp.insert(0, self.ListVar[b][n])

   self.ListVar[b] = list(self.list1)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

豆芽 2025-01-01 18:44:48

多重处理可能对您正在做的事情有用,但不适用于您考虑使用它的方式。由于您基本上是对列表的每个成员进行一些计算,因此您可以使用 multiprocessing.Pool.map 方法来并行地对列表成员进行计算。

以下示例显示了使用单个进程并使用 multiprocessing.Pool.map 的代码性能:

from multiprocessing import Pool
from random import choice
from string import printable
from time import time

def build_test_list():
    # Builds a test list consisting of 5 sublists of 10000 strings each.
    # each string is 20 characters long
    testlist = [[], [], [], [], []]
    for sublist in testlist:
        for _ in xrange(10000):
            sublist.append(''.join(choice(printable) for _ in xrange(20)))
    return testlist

def process_list(l):
    # the time-consuming code
    result = []
    tmp = []
    for n in range(len(l)):
        if l[n] not in tmp:
            result.insert(n, l[n]+' ('+str(l.count(l[n]))+')')
            tmp.insert(0, l[n])
    return result

def single(l):
    # process the test list elements using a single process
    results = []
    for sublist in l:
        results.append(process_list(sublist))
    return results

def multi(l):
    # process the test list elements in parallel
    pool = Pool()
    results = pool.map(process_list, l)
    return results

print "Building the test list..."
testlist = build_test_list()

print "Processing the test list using a single process..."
starttime = time()
singleresults = single(testlist)
singletime = time() - starttime

print "Processing the test list using multiple processes..."
starttime = time()
multiresults = multi(testlist)
multitime = time() - starttime

# make sure they both return the same thing
assert singleresults == multiresults

print "Single process: {0:.2f}sec".format(singletime)
print "Multiple processes: {0:.2f}sec".format(multitime)

输出:

Building the test list...
Processing the test list using a single process...
Processing the test list using multiple processes...
Single process: 34.73sec
Multiple processes: 24.97sec

Multiprocessing could be useful for what you're doing, but not in the way you're thinking about using it. As you're basically doing some computation on every member of a list, you could do it using the multiprocessing.Pool.map method, to do the computation on the list members in parallel.

Here is an example that shows your code's performance using a single process and using multiprocessing.Pool.map:

from multiprocessing import Pool
from random import choice
from string import printable
from time import time

def build_test_list():
    # Builds a test list consisting of 5 sublists of 10000 strings each.
    # each string is 20 characters long
    testlist = [[], [], [], [], []]
    for sublist in testlist:
        for _ in xrange(10000):
            sublist.append(''.join(choice(printable) for _ in xrange(20)))
    return testlist

def process_list(l):
    # the time-consuming code
    result = []
    tmp = []
    for n in range(len(l)):
        if l[n] not in tmp:
            result.insert(n, l[n]+' ('+str(l.count(l[n]))+')')
            tmp.insert(0, l[n])
    return result

def single(l):
    # process the test list elements using a single process
    results = []
    for sublist in l:
        results.append(process_list(sublist))
    return results

def multi(l):
    # process the test list elements in parallel
    pool = Pool()
    results = pool.map(process_list, l)
    return results

print "Building the test list..."
testlist = build_test_list()

print "Processing the test list using a single process..."
starttime = time()
singleresults = single(testlist)
singletime = time() - starttime

print "Processing the test list using multiple processes..."
starttime = time()
multiresults = multi(testlist)
multitime = time() - starttime

# make sure they both return the same thing
assert singleresults == multiresults

print "Single process: {0:.2f}sec".format(singletime)
print "Multiple processes: {0:.2f}sec".format(multitime)

Output:

Building the test list...
Processing the test list using a single process...
Processing the test list using multiple processes...
Single process: 34.73sec
Multiple processes: 24.97sec
西瓜 2025-01-01 18:44:48

ETA:现在您已经发布了代码,我可以告诉您有一种简单的方法可以更快地完成您正在做的事情(快 100 倍以上)。

我看到您正在做的是将括号中的频率添加到字符串列表中的每个项目。您不必每次都计算所有元素(正如您可以使用 cProfile 确认的那样,这是迄今为止代码中最大的瓶颈),您可以创建一个 字典,将每个元素映射到其频率。这样,您只需浏览该列表两次 - 一次创建频率字典,一次使用它来添加频率。

在这里,我将展示我的新方法,对其进行计时,并使用生成的测试用例将其与旧方法进行比较。测试用例甚至显示新结果与旧结果完全相同。 注意:下面您真正需要注意的是 new_method。

import random
import time
import collections
import cProfile

LIST_LEN = 14000

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t


def random_string(length=3):
    """Return a random string of given length"""
    return "".join([chr(random.randint(65, 90)) for i in range(length)])


class Profiler:
    def __init__(self):
        self.original = [[random_string() for i in range(LIST_LEN)]
                            for j in range(4)]

    def old_method(self):
        self.ListVar = self.original[:]
        for b in range(len(self.ListVar)):
            self.list1 = []
            self.temp = []
            for n in range(len(self.ListVar[b])):
                if not self.ListVar[b][n] in self.temp:
                    self.list1.insert(n, self.ListVar[b][n] + '(' +    str(self.ListVar[b].count(self.ListVar[b][n])) + ')')
                    self.temp.insert(0, self.ListVar[b][n])

            self.ListVar[b] = list(self.list1)
        return self.ListVar

    def new_method(self):
        self.ListVar = self.original[:]
        for i, inner_lst in enumerate(self.ListVar):
            freq_dict = collections.defaultdict(int)
            # create frequency dictionary
            for e in inner_lst:
                freq_dict[e] += 1
            temp = set()
            ret = []
            for e in inner_lst:
                if e not in temp:
                    ret.append(e + '(' + str(freq_dict[e]) + ')')
                    temp.add(e)
            self.ListVar[i] = ret
        return self.ListVar

    def time_and_confirm(self):
        """
        Time the old and new methods, and confirm they return the same value
        """
        time_a = time.time()
        l1 = self.old_method()
        time_b = time.time()
        l2 = self.new_method()
        time_c = time.time()

        # confirm that the two are the same
        assert l1 == l2, "The old and new methods don't return the same value"

        return time_b - time_a, time_c - time_b

p = Profiler()
print p.time_and_confirm()

当我运行它时,它的时间为 (15.963812112808228, 0.05961179733276367),这意味着它的速度大约快 250 倍,尽管这个优势取决于列表的长度和每个列表中的频率分布。我相信你会同意,有了这种速度优势,你可能不需要使用多重处理:)

(我的原始答案留在下面供后代使用)

ETA:顺便说一句,值得注意的是,这个算法是列表的长度大致呈线性,而您使用的代码是二次的。这意味着元素数量越多,它的性能优势就越大。例如,如果将每个列表的长度增加到 1000000,则运行只需 5 秒。根据推断,旧代码将花费一天多的时间:)


这取决于您正在执行的操作。例如:

import time
NUM_RANGE = 100000000

from multiprocessing  import Process

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t

def multi():
    class MultiProcess(Process):
        def __init__(self):
            Process.__init__(self)

        def run(self):
            # Alter string + test processing speed
            for i in xrange(NUM_RANGE):
                a = 20 * 20

    thread1 = MultiProcess()
    thread2 = MultiProcess()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

def single():
    for i in xrange(NUM_RANGE):
        a = 20 * 20

    for i in xrange(NUM_RANGE):
        a = 20 * 20

print timefunc(multi) / timefunc(single)

在我的机器上,多进程操作只占用单线程操作的 60% 左右的时间。

ETA: Now that you've posted your code, I can tell you there is a simple way to do what you're doing MUCH faster (>100 times faster).

I see that what you're doing is adding a frequency in parentheses to each item in a list of strings. Instead of counting all the elements each time (which, as you can confirm using cProfile, is by far the largest bottleneck in your code), you can just create a dictionary that maps from each element to its frequency. That way, you only have to go through the list twice- once to create the frequency dictionary, once to use it to add frequency.

Here I'll show my new method, time it, and compare it to the old method using a generated test case. The test case even shows the new result to be exactly identical to the old one. Note: All you really need to pay attention to below is the new_method.

import random
import time
import collections
import cProfile

LIST_LEN = 14000

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t


def random_string(length=3):
    """Return a random string of given length"""
    return "".join([chr(random.randint(65, 90)) for i in range(length)])


class Profiler:
    def __init__(self):
        self.original = [[random_string() for i in range(LIST_LEN)]
                            for j in range(4)]

    def old_method(self):
        self.ListVar = self.original[:]
        for b in range(len(self.ListVar)):
            self.list1 = []
            self.temp = []
            for n in range(len(self.ListVar[b])):
                if not self.ListVar[b][n] in self.temp:
                    self.list1.insert(n, self.ListVar[b][n] + '(' +    str(self.ListVar[b].count(self.ListVar[b][n])) + ')')
                    self.temp.insert(0, self.ListVar[b][n])

            self.ListVar[b] = list(self.list1)
        return self.ListVar

    def new_method(self):
        self.ListVar = self.original[:]
        for i, inner_lst in enumerate(self.ListVar):
            freq_dict = collections.defaultdict(int)
            # create frequency dictionary
            for e in inner_lst:
                freq_dict[e] += 1
            temp = set()
            ret = []
            for e in inner_lst:
                if e not in temp:
                    ret.append(e + '(' + str(freq_dict[e]) + ')')
                    temp.add(e)
            self.ListVar[i] = ret
        return self.ListVar

    def time_and_confirm(self):
        """
        Time the old and new methods, and confirm they return the same value
        """
        time_a = time.time()
        l1 = self.old_method()
        time_b = time.time()
        l2 = self.new_method()
        time_c = time.time()

        # confirm that the two are the same
        assert l1 == l2, "The old and new methods don't return the same value"

        return time_b - time_a, time_c - time_b

p = Profiler()
print p.time_and_confirm()

When I run this, it gets times of (15.963812112808228, 0.05961179733276367), meaning it's about 250 times faster, though this advantage depends on both how long the lists are and the frequency distribution within each list. I'm sure you'll agree that with this speed advantage, you probably won't need to use multiprocessing :)

(My original answer is left in below for posterity)

ETA: By the way, it is worth noting that this algorithm is roughly linear in the length of the lists, while the code you used is quadratic. This means it performs with even more of an advantage the larger the number of elements. For example, if you increase the length of each list to 1000000, it takes only 5 seconds to run. Based on extrapolation, the old code would take over a day :)


It depends on the operation you are performing. For example:

import time
NUM_RANGE = 100000000

from multiprocessing  import Process

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t

def multi():
    class MultiProcess(Process):
        def __init__(self):
            Process.__init__(self)

        def run(self):
            # Alter string + test processing speed
            for i in xrange(NUM_RANGE):
                a = 20 * 20

    thread1 = MultiProcess()
    thread2 = MultiProcess()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

def single():
    for i in xrange(NUM_RANGE):
        a = 20 * 20

    for i in xrange(NUM_RANGE):
        a = 20 * 20

print timefunc(multi) / timefunc(single)

On my machine, the multiprocessed operation takes up only ~60% the time of the singlethreaded one.

兔小萌 2025-01-01 18:44:48

这个例子太小,无法从多处理中受益。

启动新流程时会产生大量开销。如果涉及大量加工,则可以忽略不计。但您的示例实际上并不是那么密集,因此您一定会注意到开销。

您可能会注意到与真实线程的更大差异,太糟糕的是 python(好吧,CPython)在 CPU 绑定线程方面存在问题。

This example is too small to benefit from multiprocessing.

There's a LOT of overhead when starting a new process. If there were heavy processing involved, it would be negligable. But your example really isn't all that intensive, and so you're bound to notice the overhead.

You'd probably notice a bigger difference with real threads, too bad python (well, CPython) has issues with CPU-bound threading.

小瓶盖 2025-01-01 18:44:48

这个线程非常有用!

快速观察上面 David Robinson 提供的良好第二代码(于 2012 年 1 月 8 日 5:34 回答),该代码更适合我当前的需求。

就我而言,我之前有目标函数运行时间的记录,没有进行多处理。当使用他的代码实现多处理函数时,他的 timefunc(multi) 并没有反映多处理的实际时间,而是似乎反映了父进程中花费的时间。

我所做的是将计时函数外部化,我得到的时间看起来更像预期的:

 start = timefunc()
 multi()/single()
 elapsed = (timefunc()-start)/(--number of workers--)
 print(elapsed)

在我的双核情况下,“x”个工作人员使用目标函数执行的总时间比运行简单的 for- 快两倍使用“x”迭代循环目标函数。

我是多重处理的新手,所以请谨慎对待这一观察。

This thread has been very useful!

Just a quick observation over the good second code provided by David Robinson above (answered Jan 8 '12 at 5:34), which was the code more suitable to my current needs.

In my case I had previous records of the running times of a target function without multiprocessing. When using his code to implement a multiprocessing function his timefunc(multi) didn't reflect the actual time of multi, and it rather appeared to reflect the time expended in the parent.

What i did was to externalise the timing function and the time that I got looked more like expected:

 start = timefunc()
 multi()/single()
 elapsed = (timefunc()-start)/(--number of workers--)
 print(elapsed)

In my case with a double core the total time carried out by 'x' workers using the target function was twice faster than running a simple for-loop over the target function with 'x' iterations.

I am new to multiprocessing so please be cautious with this observation though.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文