有没有简单的方法来对Python脚本进行基准测试?

发布于 2024-08-08 11:57:26 字数 112 浏览 2 评论 0原文

通常我使用 shell 命令time。我的目的是测试数据集是小、中、大还是非常大,需要多少时间和内存使用量。

有什么 Linux 或 Python 工具可以做到这一点吗?

Usually I use shell command time. My purpose is to test if data is small, medium, large or very large set, how much time and memory usage will be.

Any tools for Linux or just Python to do this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(14

数理化全能战士 2024-08-15 11:57:27

有多种方法可以对 Python 脚本进行基准测试。一种简单的方法是使用 timeit 模块,它提供了一种简单的方法来测量小代码片段的执行时间。但是,如果您正在寻找包含内存使用情况的更全面的基准测试,则可以使用 memory_profiler 包来测量内存使用情况。

要可视化您的基准,您可以使用 plotly 库,它允许您创建交互式绘图。您可以创建折线图来显示不同输入大小的执行时间和内存使用情况。

下面是一个示例代码片段,用于对以矩阵、行和列作为输入的函数的两种不同实现进行基准测试:

import timeit
import random
import numpy as np

from plotly.subplots import make_subplots
import plotly.graph_objects as go


from memory_profiler import memory_usage
from memory_profiler import profile

from my.package.module import real_func_1, real_func_2

@profile
def func_impl_1(matrix, row, column):
    return real_func_1(matrix, row, column)

@profile
def func_impl_2(matrix, row, column):
    return real_func_2(matrix, row, column)


# Analysis range
x = list(range(3, 100))

# Time results
y1 = []
y2 = []

# Memory results
m1 = []
m2 = []


for i in x:
    # Random choice of parameters
    A = np.random.rand(i, i)
    rx = random.randint(0, i-1)
    ry = random.randint(0, i-1)

    t1 = 0
    t2 = 0

    m1_ = 0
    m2_ = 0

    for _ in range(10):
        t1 += timeit.timeit(
            lambda: func_impl_1(A, rx, ry),
            number=1,
        )

        t2 += timeit.timeit(
            lambda: func_impl_2(A, rx, ry),
            number=1,
        )

        m1_ += max(memory_usage(
            (lambda: func_impl_1(A, rx, ry),)
        ))

        m2_ += max(memory_usage(
            (lambda: func_impl_2(A, rx, ry),)
        ))


    y1.append(t1/100)
    y2.append(t2/100)

    m1.append(m1_/100)
    m2.append(m2_/100)

# Title of first graph:

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, subplot_titles=("Time", "Memory"))

fig.add_trace(go.Scatter(x=x, y=y1, name='func_impl_1 time', legendgroup='1'), row=1, col=1)
fig.add_trace(go.Scatter(x=x, y=y2, name='func_impl_2 time', legendgroup='1'), row=1, col=1)

fig.add_trace(go.Scatter(x=x, y=m1, name='func_impl_1 memory', legendgroup='2'), row=2, col=1)
fig.add_trace(go.Scatter(x=x, y=m2, name='func_impl_2 memory', legendgroup='2'), row=2, col=1)


fig.update_layout(
    title="Performance of the functions",
    xaxis_title="Matrix size",
)

fig.update_yaxes(title_text="Time (s)", row=1, col=1)
fig.update_yaxes(title_text="Max Memory usage (MB)", row=2, col=1)

fig.show()

图表:
图表时间和内存基准

从图表中可以看出,这两个函数似乎具有相似的内存使用情况,这一点很高兴知道。就运行时而言,func_impl_2 似乎普遍比 func_impl_1 更快,这也是一个积极的发现。然而,两个函数之间的性能差异非常小,并且对于非常小的输入大小,func_impl_1 的性能超过了 func_impl_2 的性能。这可能表明 func_impl_1 的更简单实现对于较小的输入仍然是可行的选择,尽管 func_impl_2 通常更快。总体而言,这些图表提供了有关这些功能性能的宝贵见解,并且可以在选择在不同场景中使用哪种实现时帮助做出决策。

There are several ways to benchmark Python scripts. One simple way to do this is by using the timeit module, which provides a simple way to measure the execution time of small code snippets. However, if you are looking for a more comprehensive benchmark that includes memory usage, you can use the memory_profiler package to measure memory usage.

To visualize your benchmarks, you can use the plotly library, which allows you to create interactive plots. You can create a line chart to display the execution time and memory usage for different input sizes.

Here's an example code snippet to benchmark two different implementations of a function that takes a matrix, row and column as inputs:

import timeit
import random
import numpy as np

from plotly.subplots import make_subplots
import plotly.graph_objects as go


from memory_profiler import memory_usage
from memory_profiler import profile

from my.package.module import real_func_1, real_func_2

@profile
def func_impl_1(matrix, row, column):
    return real_func_1(matrix, row, column)

@profile
def func_impl_2(matrix, row, column):
    return real_func_2(matrix, row, column)


# Analysis range
x = list(range(3, 100))

# Time results
y1 = []
y2 = []

# Memory results
m1 = []
m2 = []


for i in x:
    # Random choice of parameters
    A = np.random.rand(i, i)
    rx = random.randint(0, i-1)
    ry = random.randint(0, i-1)

    t1 = 0
    t2 = 0

    m1_ = 0
    m2_ = 0

    for _ in range(10):
        t1 += timeit.timeit(
            lambda: func_impl_1(A, rx, ry),
            number=1,
        )

        t2 += timeit.timeit(
            lambda: func_impl_2(A, rx, ry),
            number=1,
        )

        m1_ += max(memory_usage(
            (lambda: func_impl_1(A, rx, ry),)
        ))

        m2_ += max(memory_usage(
            (lambda: func_impl_2(A, rx, ry),)
        ))


    y1.append(t1/100)
    y2.append(t2/100)

    m1.append(m1_/100)
    m2.append(m2_/100)

# Title of first graph:

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, subplot_titles=("Time", "Memory"))

fig.add_trace(go.Scatter(x=x, y=y1, name='func_impl_1 time', legendgroup='1'), row=1, col=1)
fig.add_trace(go.Scatter(x=x, y=y2, name='func_impl_2 time', legendgroup='1'), row=1, col=1)

fig.add_trace(go.Scatter(x=x, y=m1, name='func_impl_1 memory', legendgroup='2'), row=2, col=1)
fig.add_trace(go.Scatter(x=x, y=m2, name='func_impl_2 memory', legendgroup='2'), row=2, col=1)


fig.update_layout(
    title="Performance of the functions",
    xaxis_title="Matrix size",
)

fig.update_yaxes(title_text="Time (s)", row=1, col=1)
fig.update_yaxes(title_text="Max Memory usage (MB)", row=2, col=1)

fig.show()

The graph:
Graph with time and memory benchmark

Looking at the graph, it seems like both functions have similar memory usage, which is good to know. In terms of runtime, func_impl_2 seems to be generally faster than func_impl_1, which is also a positive finding. However, the difference in performance between the two functions is quite small, and there is a point where the performance of func_impl_1 surpasses that of func_impl_2 for very small input sizes. This may indicate that the simpler implementation of func_impl_1 is still a viable option for smaller inputs, even though func_impl_2 is generally faster. Overall, the graphs provide valuable insights into the performance of these functions and can help with decision-making when choosing which implementation to use in different scenarios.

萌能量女王 2024-08-15 11:57:27

快速测试任何函数的简单方法是使用以下语法:
%timeit my_code

例如:

%timeit a = 1

13.4 ns ± 0.781 ns per loop (mean ± std. dev. of 7 runs, 100000000 loops each)

The easy way to quickly test any function is to use this syntax :
%timeit my_code

For instance :

%timeit a = 1

13.4 ns ± 0.781 ns per loop (mean ± std. dev. of 7 runs, 100000000 loops each)
命比纸薄 2024-08-15 11:57:27

小心 timeit 非常慢,在我的中型处理器上需要 12 秒才能初始化(或者可能运行该函数)。您可以简单地测试这个接受的答案,

def test():
    lst = []
    for i in range(100):
        lst.append(i)

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test")) # 12 second

我将使用 time 代替,在我的电脑上它返回结果 0.0

import time

def test():
    lst = []
    for i in range(100):
        lst.append(i)

t1 = time.time()

test()

result = time.time() - t1
print(result) # 0.000000xxxx

Be carefull timeit is very slow, it take 12 second on my medium processor to just initialize (or maybe run the function). you can test this accepted answer

def test():
    lst = []
    for i in range(100):
        lst.append(i)

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test")) # 12 second

for simple thing I will use time instead, on my PC it return the result 0.0

import time

def test():
    lst = []
    for i in range(100):
        lst.append(i)

t1 = time.time()

test()

result = time.time() - t1
print(result) # 0.000000xxxx
£烟消云散 2024-08-15 11:57:27

根据 Danyun Liu 的回答以及一些便利功能,也许它对某人有用。

def stopwatch(repeat=1, autorun=True):
    """
    stopwatch decorator to calculate the total time of a function
    """
    import timeit
    import functools
    
    def outer_func(func):
        @functools.wraps(func)
        def time_func(*args, **kwargs):
            t1 = timeit.default_timer()
            for _ in range(repeat):
                r = func(*args, **kwargs)
            t2 = timeit.default_timer()
            print(f"Function={func.__name__}, Time={t2 - t1}")
            return r
        
        if autorun:
            try:
                time_func()
            except TypeError:
                raise Exception(f"{time_func.__name__}: autorun only works with no parameters, you may want to use @stopwatch(autorun=False)") from None
        
        return time_func
    
    if callable(repeat):
        func = repeat
        repeat = 1
        return outer_func(func)
    
    return outer_func

一些测试:

def is_in_set(x):
    return x in {"linux", "darwin"}

def is_in_list(x):
    return x in ["linux", "darwin"]

@stopwatch
def run_once():
    import time
    time.sleep(0.5)

@stopwatch(autorun=False)
def run_manually():
    import time
    time.sleep(0.5)

run_manually()

@stopwatch(repeat=10000000)
def repeat_set():
    is_in_set("windows")
    is_in_set("darwin")

@stopwatch(repeat=10000000)
def repeat_list():
    is_in_list("windows")
    is_in_list("darwin")

@stopwatch
def should_fail(x):
    pass

结果:

Function=run_once, Time=0.5005391679987952
Function=run_manually, Time=0.500624185999186
Function=repeat_set, Time=1.7064883739985817
Function=repeat_list, Time=1.8905151920007484
Traceback (most recent call last):
  (some more traceback here...)
Exception: should_fail: autorun only works with no parameters, you may want to use @stopwatch(autorun=False)

Based on Danyun Liu's answer with some convenience features, perhaps it is useful to someone.

def stopwatch(repeat=1, autorun=True):
    """
    stopwatch decorator to calculate the total time of a function
    """
    import timeit
    import functools
    
    def outer_func(func):
        @functools.wraps(func)
        def time_func(*args, **kwargs):
            t1 = timeit.default_timer()
            for _ in range(repeat):
                r = func(*args, **kwargs)
            t2 = timeit.default_timer()
            print(f"Function={func.__name__}, Time={t2 - t1}")
            return r
        
        if autorun:
            try:
                time_func()
            except TypeError:
                raise Exception(f"{time_func.__name__}: autorun only works with no parameters, you may want to use @stopwatch(autorun=False)") from None
        
        return time_func
    
    if callable(repeat):
        func = repeat
        repeat = 1
        return outer_func(func)
    
    return outer_func

Some tests:

def is_in_set(x):
    return x in {"linux", "darwin"}

def is_in_list(x):
    return x in ["linux", "darwin"]

@stopwatch
def run_once():
    import time
    time.sleep(0.5)

@stopwatch(autorun=False)
def run_manually():
    import time
    time.sleep(0.5)

run_manually()

@stopwatch(repeat=10000000)
def repeat_set():
    is_in_set("windows")
    is_in_set("darwin")

@stopwatch(repeat=10000000)
def repeat_list():
    is_in_list("windows")
    is_in_list("darwin")

@stopwatch
def should_fail(x):
    pass

Result:

Function=run_once, Time=0.5005391679987952
Function=run_manually, Time=0.500624185999186
Function=repeat_set, Time=1.7064883739985817
Function=repeat_list, Time=1.8905151920007484
Traceback (most recent call last):
  (some more traceback here...)
Exception: should_fail: autorun only works with no parameters, you may want to use @stopwatch(autorun=False)
薄暮涼年 2024-08-15 11:57:27

我写了一个工具对给定函数进行并发压力测试,输出与 Apache AB 类似。可能这就是您想要的:

import itertools
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass


def create_counter():
    """
    Atomic counter
    """
    return itertools.count()


def get_and_increase(counter):
    return next(counter)


class Context:
    def __init__(self, num_threads, target_fun):
        self.failed_counter = create_counter()
        self.start_barrier = threading.Barrier(num_threads)
        self.target_fun = target_fun


class Worker:
    def __init__(self, context, num_per_thread):
        self.context = context
        self.num_per_thread = num_per_thread
        self.time_takes_arr = []

    def run(self):
        target_fun = self.context.target_fun
        start_barrier = self.context.start_barrier
        failed_counter = self.context.failed_counter
        time_takes_arr = self.time_takes_arr
        start_barrier.wait()
        for i in range(self.num_per_thread):
            start = time.time_ns()
            try:
                target_fun()
            except Exception as err:
                get_and_increase(failed_counter)
            finally:
                time_takes_arr.append(time.time_ns() - start)


def test(num, num_threads, target_fun, num_warm_up=0):
    if num_warm_up > 0:
        for i in range(num_warm_up):
            target_fun()

    executor = ThreadPoolExecutor(max_workers=num_threads)

    num_per_thread = num // num_threads
    context = Context(num_threads, target_fun)
    workers = [Worker(context, num_per_thread) for _ in range(num_threads)]

    for worker in workers:
        executor.submit(lambda: worker.run())
    executor.shutdown(wait=True)

    act_num = num_per_thread * num_threads
    failed_num = get_and_increase(context.failed_counter)

    time_takes_all = []
    time_takes_thread = []
    for worker in workers:
        time_takes_arr = worker.time_takes_arr
        time_takes_thread.append(sum(time_takes_arr))
        time_takes_all.extend(time_takes_arr)
    time_takes_all.sort()
    time_takes_thread.sort()

    total_time_takes = time_takes_thread[-1]

    return num_threads, total_time_takes, act_num, failed_num, time_takes_all


def format(test_name, num_threads, total_time_takes, act_num, failed_num, time_takes_all):
    idx50 = act_num // 2
    idx66 = act_num * 66 // 100
    idx75 = act_num * 75 // 100
    idx80 = act_num * 80 // 100
    idx90 = act_num * 90 // 100
    idx95 = act_num * 95 // 100
    idx98 = act_num * 98 // 100
    idx99 = act_num * 99 // 100

    time_sum = sum(time_takes_all)
    tps = 1000_000_000 * num_threads * (act_num / time_sum)
    time_takes_avg = time_sum / len(time_takes_all) / 1000_000

    return f"""{test_name} test result:
 Concurrency Level: {num_threads:d}
 Time taken for tests:  {total_time_takes / 1000000:.6f} ms
 Complete Tasks:    {act_num:,}
 Failed Tasks:      {failed_num:,}
 Tasks per second:  {tps:,.2f}
 Time per task:     {time_takes_avg:.9f} ms
 Time per task:     {time_takes_avg / num_threads} ms (across all concurrent tasks)
 Shortest task:     {time_takes_all[0] / 1000000:.9f} ms
 Percentage of the tasks served within a certain time (ms)
  50%   {time_takes_all[idx50] / 1000000:.6f}
  66%   {time_takes_all[idx66] / 1000000:.6f}
  75%   {time_takes_all[idx75] / 1000000:.6f}
  80%   {time_takes_all[idx80] / 1000000:.6f}
  90%   {time_takes_all[idx90] / 1000000:.6f}
  95%   {time_takes_all[idx95] / 1000000:.6f}
  98%   {time_takes_all[idx98] / 1000000:.6f}
  99%   {time_takes_all[idx99] / 1000000:.6f}
 100%   {time_takes_all[-1] / 1000000:.6f} (longest task)"""


def test_and_print(test_name, num, num_threads, target_fun, num_warm_up=10):
    num_threads, total_time_takes, act_num, failed_num, time_takes_all = test(num, num_threads, target_fun, num_warm_up)
    print(format(test_name, num_threads, total_time_takes, act_num, failed_num, time_takes_all))


if __name__ == "__main__":
    c1 = create_counter()


    def task():
        get_and_increase(c1)


    test_and_print("My test",
                   7000000, 50
                   , task
                   , 20)
    print()
    print(f"c1:{get_and_increase(c1)}")

输出将是:

My test test result:
 Concurrency Level: 50
 Time taken for tests:  1544.781000 ms
 Complete Tasks:    7,000,000
 Failed Tasks:      0
 Tasks per second:  10,133,790.35
 Time per task:     0.004933988 ms
 Time per task:     9.867976000000001e-05 ms (across all concurrent tasks)
 Shortest task:     -0.003000000 ms
 Percentage of the tasks served within a certain time (ms)
  50%   0.000000
  66%   0.000000
  75%   0.000000
  80%   0.000000
  90%   0.001000
  95%   0.001000
  98%   0.001000
  99%   0.001000
 100%   1120.871000 (longest task)

I wrote a tool to do concurrency stress test on a given function, and the out put is similar to Apache AB. may be this is what you want:

import itertools
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass


def create_counter():
    """
    Atomic counter
    """
    return itertools.count()


def get_and_increase(counter):
    return next(counter)


class Context:
    def __init__(self, num_threads, target_fun):
        self.failed_counter = create_counter()
        self.start_barrier = threading.Barrier(num_threads)
        self.target_fun = target_fun


class Worker:
    def __init__(self, context, num_per_thread):
        self.context = context
        self.num_per_thread = num_per_thread
        self.time_takes_arr = []

    def run(self):
        target_fun = self.context.target_fun
        start_barrier = self.context.start_barrier
        failed_counter = self.context.failed_counter
        time_takes_arr = self.time_takes_arr
        start_barrier.wait()
        for i in range(self.num_per_thread):
            start = time.time_ns()
            try:
                target_fun()
            except Exception as err:
                get_and_increase(failed_counter)
            finally:
                time_takes_arr.append(time.time_ns() - start)


def test(num, num_threads, target_fun, num_warm_up=0):
    if num_warm_up > 0:
        for i in range(num_warm_up):
            target_fun()

    executor = ThreadPoolExecutor(max_workers=num_threads)

    num_per_thread = num // num_threads
    context = Context(num_threads, target_fun)
    workers = [Worker(context, num_per_thread) for _ in range(num_threads)]

    for worker in workers:
        executor.submit(lambda: worker.run())
    executor.shutdown(wait=True)

    act_num = num_per_thread * num_threads
    failed_num = get_and_increase(context.failed_counter)

    time_takes_all = []
    time_takes_thread = []
    for worker in workers:
        time_takes_arr = worker.time_takes_arr
        time_takes_thread.append(sum(time_takes_arr))
        time_takes_all.extend(time_takes_arr)
    time_takes_all.sort()
    time_takes_thread.sort()

    total_time_takes = time_takes_thread[-1]

    return num_threads, total_time_takes, act_num, failed_num, time_takes_all


def format(test_name, num_threads, total_time_takes, act_num, failed_num, time_takes_all):
    idx50 = act_num // 2
    idx66 = act_num * 66 // 100
    idx75 = act_num * 75 // 100
    idx80 = act_num * 80 // 100
    idx90 = act_num * 90 // 100
    idx95 = act_num * 95 // 100
    idx98 = act_num * 98 // 100
    idx99 = act_num * 99 // 100

    time_sum = sum(time_takes_all)
    tps = 1000_000_000 * num_threads * (act_num / time_sum)
    time_takes_avg = time_sum / len(time_takes_all) / 1000_000

    return f"""{test_name} test result:
 Concurrency Level: {num_threads:d}
 Time taken for tests:  {total_time_takes / 1000000:.6f} ms
 Complete Tasks:    {act_num:,}
 Failed Tasks:      {failed_num:,}
 Tasks per second:  {tps:,.2f}
 Time per task:     {time_takes_avg:.9f} ms
 Time per task:     {time_takes_avg / num_threads} ms (across all concurrent tasks)
 Shortest task:     {time_takes_all[0] / 1000000:.9f} ms
 Percentage of the tasks served within a certain time (ms)
  50%   {time_takes_all[idx50] / 1000000:.6f}
  66%   {time_takes_all[idx66] / 1000000:.6f}
  75%   {time_takes_all[idx75] / 1000000:.6f}
  80%   {time_takes_all[idx80] / 1000000:.6f}
  90%   {time_takes_all[idx90] / 1000000:.6f}
  95%   {time_takes_all[idx95] / 1000000:.6f}
  98%   {time_takes_all[idx98] / 1000000:.6f}
  99%   {time_takes_all[idx99] / 1000000:.6f}
 100%   {time_takes_all[-1] / 1000000:.6f} (longest task)"""


def test_and_print(test_name, num, num_threads, target_fun, num_warm_up=10):
    num_threads, total_time_takes, act_num, failed_num, time_takes_all = test(num, num_threads, target_fun, num_warm_up)
    print(format(test_name, num_threads, total_time_takes, act_num, failed_num, time_takes_all))


if __name__ == "__main__":
    c1 = create_counter()


    def task():
        get_and_increase(c1)


    test_and_print("My test",
                   7000000, 50
                   , task
                   , 20)
    print()
    print(f"c1:{get_and_increase(c1)}")

and the output will be:

My test test result:
 Concurrency Level: 50
 Time taken for tests:  1544.781000 ms
 Complete Tasks:    7,000,000
 Failed Tasks:      0
 Tasks per second:  10,133,790.35
 Time per task:     0.004933988 ms
 Time per task:     9.867976000000001e-05 ms (across all concurrent tasks)
 Shortest task:     -0.003000000 ms
 Percentage of the tasks served within a certain time (ms)
  50%   0.000000
  66%   0.000000
  75%   0.000000
  80%   0.000000
  90%   0.001000
  95%   0.001000
  98%   0.001000
  99%   0.001000
 100%   1120.871000 (longest task)
春庭雪 2024-08-15 11:57:26

看看timeitPython 分析器pycallgraph< /a>.另请务必查看评论下面由 nikicc 提到“SnakeViz”。它为您提供了另一种有用的分析数据可视化效果。

timeit

def test():
    """Stupid test function"""
    lst = []
    for i in range(100):
        lst.append(i)

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test"))

    # For Python>=3.5 one can also write:
    print(timeit.timeit("test()", globals=locals()))

本质上,你可以将Python代码作为字符串参数传递给它,它会运行指定的次数并打印执行时间。 文档中的重要内容:

timeit.timeit(stmt='pass', setup='pass',timer=<默认计时器>, number=1000000, globals=None)
使用给定语句创建一个 Timer 实例,setup
代码和 timer 函数并运行其 timeit 方法
数量 处决。可选的 globals 参数指定在其中执行代码的命名空间。

... 和:

Timer.timeit(number=1000000)
主语句的执行时间 number 次。这将执行设置
语句一次,然后返回执行 main 所花费的时间
语句多次,以秒为单位以浮点形式测量。
参数是循环的次数,默认为1
百万。主语句、设置语句和定时器函数
要使用的内容被传递给构造函数。

注意:
默认情况下,timeit在计时期间暂时关闭垃圾收集。这种方法的优点是
它使独立的计时更具可比性。这个缺点是
GC 可能是性能的一个重要组成部分
被测量的函数。如果是这样,可以首先重新启用 GC
setup 字符串中的语句。例如:

timeit.Timer('for i in xrange(10): oct(i)', 'gc.enable()').timeit()

分析

分析将为您提供 关于正在发生的事情的更详细的想法。这是来自官方文档的“即时示例”:

import cProfile
import re
cProfile.run('re.compile("foo|bar")')

将为您提供:

      197 function calls (192 primitive calls) in 0.002 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    0.000    0.000    0.001    0.001 <string>:1(<module>)
     1    0.000    0.000    0.001    0.001 re.py:212(compile)
     1    0.000    0.000    0.001    0.001 re.py:268(_compile)
     1    0.000    0.000    0.000    0.000 sre_compile.py:172(_compile_charset)
     1    0.000    0.000    0.000    0.000 sre_compile.py:201(_optimize_charset)
     4    0.000    0.000    0.000    0.000 sre_compile.py:25(_identityfunction)
   3/1    0.000    0.000    0.000    0.000 sre_compile.py:33(_compile)

这 模块应该让您了解在哪里寻找瓶颈。

另外,要掌握 profile 的输出,请查看此发布

pycallgraph

注意 pycallgraph已被正式放弃自2018年2月起。截至 2020 年 12 月,它仍在 Python 3.6 上运行。只要 python 公开分析 API 的方式没有发生核心变化,它就应该仍然是一个有用的工具。

此模块使用 graphviz 创建调用图,如下所示:

callgraph example

您可以通过颜色轻松查看哪些路径使用时间最多。您可以使用 pycallgraph API 或使用打包脚本创建它们:

pycallgraph graphviz -- ./mypythonscript.py

不过开销相当大。因此,对于已经长时间运行的流程,创建图表可能需要一些时间。

Have a look at timeit, the python profiler and pycallgraph. Also make sure to have a look at the comment below by nikicc mentioning "SnakeViz". It gives you yet another visualisation of profiling data which can be helpful.

timeit

def test():
    """Stupid test function"""
    lst = []
    for i in range(100):
        lst.append(i)

if __name__ == '__main__':
    import timeit
    print(timeit.timeit("test()", setup="from __main__ import test"))

    # For Python>=3.5 one can also write:
    print(timeit.timeit("test()", globals=locals()))

Essentially, you can pass it python code as a string parameter, and it will run in the specified amount of times and prints the execution time. The important bits from the docs:

timeit.timeit(stmt='pass', setup='pass', timer=<default timer>, number=1000000, globals=None)
Create a Timer instance with the given statement, setup
code and timer function and run its timeit method with
number executions. The optional globals argument specifies a namespace in which to execute the code.

... and:

Timer.timeit(number=1000000)
Time number executions of the main statement. This executes the setup
statement once, and then returns the time it takes to execute the main
statement a number of times, measured in seconds as a float.
The argument is the number of times through the loop, defaulting to one
million. The main statement, the setup statement and the timer function
to be used are passed to the constructor.

Note:
By default, timeit temporarily turns off garbage collection during the timing. The advantage of this approach is that
it makes independent timings more comparable. This disadvantage is
that GC may be an important component of the performance of the
function being measured. If so, GC can be re-enabled as the first
statement in the setup string. For example:

timeit.Timer('for i in xrange(10): oct(i)', 'gc.enable()').timeit()

Profiling

Profiling will give you a much more detailed idea about what's going on. Here's the "instant example" from the official docs:

import cProfile
import re
cProfile.run('re.compile("foo|bar")')

Which will give you:

      197 function calls (192 primitive calls) in 0.002 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    0.000    0.000    0.001    0.001 <string>:1(<module>)
     1    0.000    0.000    0.001    0.001 re.py:212(compile)
     1    0.000    0.000    0.001    0.001 re.py:268(_compile)
     1    0.000    0.000    0.000    0.000 sre_compile.py:172(_compile_charset)
     1    0.000    0.000    0.000    0.000 sre_compile.py:201(_optimize_charset)
     4    0.000    0.000    0.000    0.000 sre_compile.py:25(_identityfunction)
   3/1    0.000    0.000    0.000    0.000 sre_compile.py:33(_compile)

Both of these modules should give you an idea about where to look for bottlenecks.

Also, to get to grips with the output of profile, have a look at this post

pycallgraph

NOTE pycallgraph has been officially abandoned since Feb. 2018. As of Dec. 2020 it was still working on Python 3.6 though. As long as there are no core changes in how python exposes the profiling API it should remain a helpful tool though.

This module uses graphviz to create callgraphs like the following:

callgraph example

You can easily see which paths used up the most time by colour. You can either create them using the pycallgraph API, or using a packaged script:

pycallgraph graphviz -- ./mypythonscript.py

The overhead is quite considerable though. So for already long-running processes, creating the graph can take some time.

み格子的夏天 2024-08-15 11:57:26

我使用一个简单的装饰器来计时功能

import time

def st_time(func):
    """
        st decorator to calculate the total time of a func
    """

    def st_func(*args, **keyArgs):
        t1 = time.time()
        r = func(*args, **keyArgs)
        t2 = time.time()
        print("Function=%s, Time=%s" % (func.__name__, t2 - t1))
        return r

    return st_func

I use a simple decorator to time the func

import time

def st_time(func):
    """
        st decorator to calculate the total time of a func
    """

    def st_func(*args, **keyArgs):
        t1 = time.time()
        r = func(*args, **keyArgs)
        t2 = time.time()
        print("Function=%s, Time=%s" % (func.__name__, t2 - t1))
        return r

    return st_func
此刻的回忆 2024-08-15 11:57:26

timeit 模块又慢又奇怪,所以我写了这个:

def timereps(reps, func):
    from time import time
    start = time()
    for i in range(0, reps):
        func()
    end = time()
    return (end - start) / reps

示例:

import os
listdir_time = timereps(10000, lambda: os.listdir('/'))
print "python can do %d os.listdir('/') per second" % (1 / listdir_time)

对我来说,它说:

python can do 40925 os.listdir('/') per second

这是一种原始的基准测试,但它已经足够好了。

The timeit module was slow and weird, so I wrote this:

def timereps(reps, func):
    from time import time
    start = time()
    for i in range(0, reps):
        func()
    end = time()
    return (end - start) / reps

Example:

import os
listdir_time = timereps(10000, lambda: os.listdir('/'))
print "python can do %d os.listdir('/') per second" % (1 / listdir_time)

For me, it says:

python can do 40925 os.listdir('/') per second

This is a primitive sort of benchmarking, but it's good enough.

楠木可依 2024-08-15 11:57:26

我通常会快速执行一次 time ./script.py 来看看需要多长时间。但这并没有显示内存,至少不是默认的。您可以使用 /usr/bin/time -v ./script.py 来获取大量信息,包括内存使用情况。

I usually do a quick time ./script.py to see how long it takes. That does not show you the memory though, at least not as a default. You can use /usr/bin/time -v ./script.py to get a lot of information, including memory usage.

夏の忆 2024-08-15 11:57:26

内存分析器可满足您的所有内存需求。

https://pypi.python.org/pypi/memory_profiler

运行 pip install:

pip install memory_profiler

导入库:

import memory_profiler

向您想要分析的项目添加装饰器:

@profile
def my_func():
    a = [1] * (10 ** 6)
    b = [2] * (2 * 10 ** 7)
    del b
    return a

if __name__ == '__main__':
    my_func()

执行代码:

python -m memory_profiler example.py

接收输出:

 Line #    Mem usage  Increment   Line Contents
 ==============================================
 3                           @profile
 4      5.97 MB    0.00 MB   def my_func():
 5     13.61 MB    7.64 MB       a = [1] * (10 ** 6)
 6    166.20 MB  152.59 MB       b = [2] * (2 * 10 ** 7)
 7     13.61 MB -152.59 MB       del b
 8     13.61 MB    0.00 MB       return a

示例来自上面链接的文档。

Memory Profiler for all your memory needs.

https://pypi.python.org/pypi/memory_profiler

Run a pip install:

pip install memory_profiler

Import the library:

import memory_profiler

Add a decorator to the item you wish to profile:

@profile
def my_func():
    a = [1] * (10 ** 6)
    b = [2] * (2 * 10 ** 7)
    del b
    return a

if __name__ == '__main__':
    my_func()

Execute the code:

python -m memory_profiler example.py

Recieve the output:

 Line #    Mem usage  Increment   Line Contents
 ==============================================
 3                           @profile
 4      5.97 MB    0.00 MB   def my_func():
 5     13.61 MB    7.64 MB       a = [1] * (10 ** 6)
 6    166.20 MB  152.59 MB       b = [2] * (2 * 10 ** 7)
 7     13.61 MB -152.59 MB       del b
 8     13.61 MB    0.00 MB       return a

Examples are from the docs, linked above.

风情万种。 2024-08-15 11:57:26

line_profiler(逐行执行时间)

安装

pip install line_profiler

用法

  • 在函数之前添加一个@profile装饰器。例如:
@profile
def function(base, index, shift):
    addend = index << shift
    result = base + addend
    return result
  • 使用命令 kernprof -l <​​file_name> 创建 line_profiler 的实例。例如:
kernprof -l test.py

如果成功,kernprof 将打印Wrote profile results to.lprof。例如:

Wrote profile results to test.py.lprof
  • 使用命令 python -m line_profiler.lprof 打印基准测试结果。例如:
python -m line_profiler test.py.lprof

您将看到每行代码的详细信息:

Timer unit: 1e-06 s

Total time: 0.0021632 s
File: test.py
Function: function at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     1                                           @profile
     2                                           def function(base, index, shift):
     3      1000        796.4      0.8     36.8      addend = index << shift
     4      1000        745.9      0.7     34.5      result = base + addend
     5      1000        620.9      0.6     28.7      return result

memory_profiler(逐行内存使用情况)

安装

pip install memory_profiler

用法

  • 在函数之前添加一个 @profile 装饰器。例如:
@profile
def function():
    result = []
    for i in range(10000):
        result.append(i)
    return result
  • 使用命令 python -m memory_profiler 打印基准测试结果。例如:
python -m memory_profiler test.py

您将看到有关每行代码的详细信息:

Filename: test.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     1   40.246 MiB   40.246 MiB           1   @profile
     2                                         def function():
     3   40.246 MiB    0.000 MiB           1       result = []
     4   40.758 MiB    0.008 MiB       10001       for i in range(10000):
     5   40.758 MiB    0.504 MiB       10000           result.append(i)
     6   40.758 MiB    0.000 MiB           1       return result

良好实践

多次调用函数以最大程度地减少对环境的影响。

line_profiler (execution time line by line)

instalation

pip install line_profiler

Usage

  • Add a @profile decorator before function. For example:
@profile
def function(base, index, shift):
    addend = index << shift
    result = base + addend
    return result
  • Use command kernprof -l <file_name> to create an instance of line_profiler. For example:
kernprof -l test.py

kernprof will print Wrote profile results to <file_name>.lprof on success. For example:

Wrote profile results to test.py.lprof
  • Use command python -m line_profiler <file_name>.lprof to print benchmark results. For example:
python -m line_profiler test.py.lprof

You will see detailed info about each line of code:

Timer unit: 1e-06 s

Total time: 0.0021632 s
File: test.py
Function: function at line 1

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
     1                                           @profile
     2                                           def function(base, index, shift):
     3      1000        796.4      0.8     36.8      addend = index << shift
     4      1000        745.9      0.7     34.5      result = base + addend
     5      1000        620.9      0.6     28.7      return result

memory_profiler (memory usage line by line)

instalation

pip install memory_profiler

Usage

  • Add a @profile decorator before function. For example:
@profile
def function():
    result = []
    for i in range(10000):
        result.append(i)
    return result
  • Use command python -m memory_profiler <file_name> to print benchmark results. For example:
python -m memory_profiler test.py

You will see detailed info about each line of code:

Filename: test.py

Line #    Mem usage    Increment  Occurences   Line Contents
============================================================
     1   40.246 MiB   40.246 MiB           1   @profile
     2                                         def function():
     3   40.246 MiB    0.000 MiB           1       result = []
     4   40.758 MiB    0.008 MiB       10001       for i in range(10000):
     5   40.758 MiB    0.504 MiB       10000           result.append(i)
     6   40.758 MiB    0.000 MiB           1       return result

Good Practice

Call a function many times to minimize environment impact.

俏︾媚 2024-08-15 11:57:26

snakeviz cProfile 交互式查看器

https://github.com /jiffyclub/snakeviz/

cProfile 在 https://stackoverflow.com/a/1593034/895245 中提到并且评论中提到了snakeviz,但我想进一步强调这一点。

仅通过查看 cprofile / pstats 输出来调试程序性能非常困难,因为它们只能开箱即用地显示每个函数的总时间。

然而,我们真正需要的是查看包含每个调用的堆栈跟踪的嵌套视图,以便轻松找到主要瓶颈。

这正是 Snakeviz 通过其默认的“冰柱”视图提供的功能。

首先,您必须将 cProfile 数据转储到二进制文件,然后您可以对其进行蛇形可视化。

pip install -u snakeviz
python -m cProfile -o results.prof myscript.py
snakeviz results.prof

这将打印一个指向 stdout 的 URL,您可以在浏览器上打开该 URL,其中包含所需的输出,如下所示:

在此处输入图像描述

然后您可以:

  • 将鼠标悬停在每个框上以查看包含该函数的文件的完整路径
  • 单击一个框以使该框显示在顶部作为放大的方式

更多面向个人资料的问题: 如何分析 Python 脚本?

snakeviz interactive viewer for cProfile

https://github.com/jiffyclub/snakeviz/

cProfile was mentioned at https://stackoverflow.com/a/1593034/895245 and snakeviz was mentioned in a comment, but I wanted to highlight it further.

It is very hard to debug program performance just by looking at cprofile / pstats output, because they can only total times per function out of the box.

However, what we really need in general is to see a nested view containing the stack traces of each call to actually find the main bottlenecks easily.

And this is exactly what snakeviz provides via its default "icicle" view.

First you have to dump the cProfile data to a binary file, and then you can snakeviz on that

pip install -u snakeviz
python -m cProfile -o results.prof myscript.py
snakeviz results.prof

This prints an URL to stdout which you can open on your browser, which contains the desired output that looks like this:

enter image description here

and you can then:

  • hover each box to see the full path to the file that contains the function
  • click on a box to make that box show up on the top as a way to zoom in

More profile oriented question: How can you profile a Python script?

仙女山的月亮 2024-08-15 11:57:26

如果您不想为 timeit 编写样板代码并轻松分析结果,请查看 benchmarkit。它还保存了以前运行的历史记录,因此可以轻松地在开发过程中比较相同的功能。

# pip install benchmarkit

from benchmarkit import benchmark, benchmark_run

N = 10000
seq_list = list(range(N))
seq_set = set(range(N))

SAVE_PATH = '/tmp/benchmark_time.jsonl'

@benchmark(num_iters=100, save_params=True)
def search_in_list(num_items=N):
    return num_items - 1 in seq_list

@benchmark(num_iters=100, save_params=True)
def search_in_set(num_items=N):
    return num_items - 1 in seq_set

benchmark_results = benchmark_run(
   [search_in_list, search_in_set],
   SAVE_PATH,
   comment='initial benchmark search',
)  

打印到终端并返回包含上次运行数据的字典列表。命令行入口点也可用。

输入图像描述这里

如果您更改 N=1000000 并重新运行

在此处输入图像描述

If you don't want to write boilerplate code for timeit and get easy to analyze results, take a look at benchmarkit. Also it saves history of previous runs, so it is easy to compare the same function over the course of development.

# pip install benchmarkit

from benchmarkit import benchmark, benchmark_run

N = 10000
seq_list = list(range(N))
seq_set = set(range(N))

SAVE_PATH = '/tmp/benchmark_time.jsonl'

@benchmark(num_iters=100, save_params=True)
def search_in_list(num_items=N):
    return num_items - 1 in seq_list

@benchmark(num_iters=100, save_params=True)
def search_in_set(num_items=N):
    return num_items - 1 in seq_set

benchmark_results = benchmark_run(
   [search_in_list, search_in_set],
   SAVE_PATH,
   comment='initial benchmark search',
)  

Prints to terminal and returns list of dictionaries with data for the last run. Command line entrypoints also available.

enter image description here

If you change N=1000000 and rerun

enter image description here

戒ㄋ 2024-08-15 11:57:26

看一下 nose 及其插件之一,这个尤其如此。

安装后,nose 就是您路径中的一个脚本,您可以在包含一些 python 脚本的目录中调用它:

$: nosetests

这将查找当前目录中的所有 python 文件,并执行它识别为测试的任何函数:例如,它会将名称中带有单词 test_ 的任何函数识别为测试。

因此,您可以创建一个名为 test_yourfunction.py 的 python 脚本,并在其中编写类似以下内容:

$: cat > test_yourfunction.py

def test_smallinput():
    yourfunction(smallinput)

def test_mediuminput():
    yourfunction(mediuminput)

def test_largeinput():
    yourfunction(largeinput)

然后您必须运行

$: nosetest --with-profile --profile-stats-file yourstatsprofile.prof testyourfunction.py

并读取配置文件,使用以下 python 行:

python -c "import hotshot.stats ; stats = hotshot.stats.load('yourstatsprofile.prof') ; stats.sort_stats('time', 'calls') ; stats.print_stats(200)"

Have a look at nose and at one of its plugins, this one in particular.

Once installed, nose is a script in your path, and that you can call in a directory which contains some python scripts:

$: nosetests

This will look in all the python files in the current directory and will execute any function that it recognizes as a test: for example, it recognizes any function with the word test_ in its name as a test.

So you can just create a python script called test_yourfunction.py and write something like this in it:

$: cat > test_yourfunction.py

def test_smallinput():
    yourfunction(smallinput)

def test_mediuminput():
    yourfunction(mediuminput)

def test_largeinput():
    yourfunction(largeinput)

Then you have to run

$: nosetest --with-profile --profile-stats-file yourstatsprofile.prof testyourfunction.py

and to read the profile file, use this python line:

python -c "import hotshot.stats ; stats = hotshot.stats.load('yourstatsprofile.prof') ; stats.sort_stats('time', 'calls') ; stats.print_stats(200)"
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文