python 3矢量化嵌套以循环,其中内部循环取决于参数

发布于 2025-02-04 02:14:42 字数 765 浏览 4 评论 0原文

在将代码从fortran到python移植时,我看到这些嵌套的变体(有时是双嵌套,有时是三重嵌套),我想矢量化(此处显示为最低可再现的示例)

import numpy as np
import sys
import math
def main():
    t = np.arange(0,300)
    n1=7
    tc = test(n1,t)

def test(n1,t):
    n2 = int(2*t.size/(n1+1))
    print(n2)
    tChunked = np.zeros(shape = (n1,n2))
    for i in range(0,n1):
        istart = int(i*n2/2)
        for j in range(0,n2):
            tChunked[i,j] = t[istart+j]



  return  tChunked

main()

我尝试了什么?

我已经到达了iStart并获得j并使用外部添加来获得istart+j。但是,我如何使用索引k在一行中获得2D tchunk的数组是我被卡住的地方。

istart = np.linspace(0,math.ceil(n1*n2/2),num=n1,endpoint=False,dtype=np.int32)
jstart = np.linspace(0,n2,num=n2,endpoint=False,dtype=np.int32)

k = jstart[:,np.newaxis]+istart

In the geosciensces while porting code from Fortran to python I see variations of these nested for loops(sometimes double nested and sometimes triple nested) that I would like to vectorize(shown here as an minimum reproducible example)

import numpy as np
import sys
import math
def main():
    t = np.arange(0,300)
    n1=7
    tc = test(n1,t)

def test(n1,t):
    n2 = int(2*t.size/(n1+1))
    print(n2)
    tChunked = np.zeros(shape = (n1,n2))
    for i in range(0,n1):
        istart = int(i*n2/2)
        for j in range(0,n2):
            tChunked[i,j] = t[istart+j]



  return  tChunked

main()

What have I tried ?

I have gotten as far as elminating the istart and getting j and using outer addition to get istart+j. But how do I use the index k to get a 2d tChunked array in a single line is where I am stuck.

istart = np.linspace(0,math.ceil(n1*n2/2),num=n1,endpoint=False,dtype=np.int32)
jstart = np.linspace(0,n2,num=n2,endpoint=False,dtype=np.int32)

k = jstart[:,np.newaxis]+istart

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

还不是爱你 2025-02-11 02:14:43

如果索引为2D,Numpy将输出2D数组。因此,您只需这样做。

def test2(n1, t):
    n2 = int(2 * t.size / (n1 + 1))
    istart = np.linspace(0, math.ceil(n1 * n2 / 2), num=n1, endpoint=False, dtype=np.int32)
    jstart = np.linspace(0, n2, num=n2, endpoint=False, dtype=np.int32)
    k = istart[:, np.newaxis] + jstart  # Note: I switched i and j.

    tChunked = t[k]  # This creates an array of the same shape as k.

    return tChunked

numpy will output a 2D array if the index is 2D. So you simply do this.

def test2(n1, t):
    n2 = int(2 * t.size / (n1 + 1))
    istart = np.linspace(0, math.ceil(n1 * n2 / 2), num=n1, endpoint=False, dtype=np.int32)
    jstart = np.linspace(0, n2, num=n2, endpoint=False, dtype=np.int32)
    k = istart[:, np.newaxis] + jstart  # Note: I switched i and j.

    tChunked = t[k]  # This creates an array of the same shape as k.

    return tChunked
纵情客 2025-02-11 02:14:43

如果您必须处理很多嵌套的循环,也许解决方案是使用 numba ,因为它可以结果比本地numpy更好的性能。特别是针对您所展示的非Python功能。

尽可能容易:

from numba import njit

@njit
def test(n1,t):
    n2 = int(2*t.size/(n1+1))
    print(n2)
    tChunked = np.zeros(shape = (n1,n2))
    for i in range(0,n1):
        istart = int(i*n2/2)
        for j in range(0,n2):
            tChunked[i,j] = t[istart+j]

If you have to deal with a lot of nested loops, maybe the solution is to use numba as it can result in better performance than native numpy. Specially for non-python functions like the one you showed.

As easy as:

from numba import njit

@njit
def test(n1,t):
    n2 = int(2*t.size/(n1+1))
    print(n2)
    tChunked = np.zeros(shape = (n1,n2))
    for i in range(0,n1):
        istart = int(i*n2/2)
        for j in range(0,n2):
            tChunked[i,j] = t[istart+j]
故人的歌 2025-02-11 02:14:43

是什么让我爱上了python的头超越实际上是Numpy,特别是它的惊人索引 and indexing例程

test_extra_crispy()中,我们可以使用zip()连续将我们的鸭子(初始条件)获取,然后使用偏移索引进行索引来进行值的“移植”值块:

i_values = np.arange(7)
istarts = (i_values * n2 / 2).astype(int)
for i, istart in zip(i_values, istarts):
    tChunked[i, :n2] = t[istart:istart+n2]

另请参阅

https://stackoverflow.com/q/33005391/3904031 "

t = np.arange(10000000)
n1 = 7

> best 比原始的(91 vs 4246毫秒),但仅比test2()来自 a>考虑到它比我的蛮力治疗更谨慎的检查

/ 拥有一个巨大的数组,然后磁盘空间,访问时间成为一个真正的问题。

“只使用jit”?

zaero divide的答案指出,@Jit在将不切实际的Python嵌套循环转换为快速编译的不切实际的Python嵌套循环方面有很长的路要走。

但是,请务必查看Numpy中已经有哪些方法可以通过编译代码有效地运行。可以轻松地在笔记本电脑上获得gigaflop,以解决Numpy的许多方便方法。

在我的情况下,test_jit()速度比“额外脆皮”(350 vs 91 ms)慢四倍!

如果您不需要,您不应该编写取决于@JIT的脚本;并非每个人都想“只使用@Jit”。

奇怪形状的索引

如果您需要在数组中解决更随机的卷,则可以使用这样的索引:

array = np.array([[0, 0, 1, 0, 0], [0, 1, 0, 1, 0], [1, 0, 0, 0, 1], [0, 1, 0, 1, 0], [0, 0, 1, 0, 0]])
print(array)

给予

[[0 0 1 0 0]
 [0 1 0 1 0]
 [1 0 0 0 1]
 [0 1 0 1 0]
 [0 0 1 0 0]]

,我们可以为1这样的索引获得索引:

i, j =  np.where(array == 1)
print(i)
print(j)

则可以给予

如果我们想从零数组开始并插入, 。这些1通过numpy索引,只需执行此操作

array = np.zeros((5, 5), dtype=int)
array[i, j] = 1

即可重新创建原始数组。


import numpy as np
import matplotlib.pyplot as plt
import time

def test_original(n1, t):
    n2 = int(2*t.size / (n1 + 1))
    tChunked = np.zeros(shape = (n1, n2))
    for i in range(n1):
        istart = int(i * n2 / 2)
        for j in range(0, n2):
            tChunked[i, j] = t[istart + j]
    return  tChunked

t = np.arange(10000000)
n1 = 7

t_start = time.process_time()

tc_original = test_original(n1, t)

print('original process time (ms)', round(1000*(time.process_time() - t_start), 3))
# print('tc_original.shape: ', tc_original.shape)

fig, ax = plt.subplots(1, 1)
for thing in tc_original:
    ax.plot(thing)
plt.show()

def test_extra_crispy(n1, t):
    n2 = int(2*t.size / (n1 + 1))
    tChunked = np.zeros(shape = (n1, n2))
    i_values = np.arange(7)
    istarts = (i_values * n2 / 2).astype(int)
    for i, istart in zip(i_values, istarts):
        tChunked[i, :n2] = t[istart:istart+n2]
    return  tChunked


t_start = time.process_time()

tc_extra_crispy = test_extra_crispy(n1, t)

print('extra crispy process time (ms)', round(1000*(time.process_time() - t_start), 3))
# print('tc_extra_crispy.shape: ', tc_extra_crispy.shape)

print('np.all(tc_extra_crispy == tc_original): ', np.all(tc_extra_crispy == tc_original))

import math

def test2(n1, t): # https://stackoverflow.com/a/72492815/3904031
    n2 = int(2 * t.size / (n1 + 1))
    istart = np.linspace(0, math.ceil(n1 * n2 / 2), num=n1, endpoint=False, dtype=np.int32)
    jstart = np.linspace(0, n2, num=n2, endpoint=False, dtype=np.int32)
    k = istart[:, np.newaxis] + jstart  # Note: I switched i and j.

    tChunked = t[k]  # This creates an array of the same shape as k.

    return tChunked

t_start = time.process_time()

tc_test2 = test2(n1, t)

print('test2 process time (ms)', round(1000*(time.process_time() - t_start), 3))
# print('tc_test2.shape: ', tc_test2.shape)

print('np.all(tc_test2 == tc_original): ', np.all(tc_test2 == tc_original))

# from https://stackoverflow.com/a/72494777/3904031

from numba import njit

@njit
def test_jit(n1,t):
    n2 = int(2*t.size/(n1+1))
    print(n2)
    tChunked = np.zeros(shape = (n1,n2))
    for i in range(0,n1):
        istart = int(i*n2/2)
        for j in range(0,n2):
            tChunked[i,j] = t[istart+j]
    return tChunked


t_start = time.process_time()

tc_jit = test_jit(n1, t)

print('jit process time (ms)', round(1000*(time.process_time() - t_start), 3))
# print('tc_jit.shape: ', tc_jit.shape)

print('np.all(tc_jit == tc_original): ', np.all(tc_jit == tc_original))

What made me fall head-over-heels in love with Python was actually NumPy and specifically its amazing indexing and indexing routines!

In test_extra_crispy() we can use zip() to get our ducks (initial conditions) in a row, then indexing using offsets to do the "transplanting" of blocks of values:

i_values = np.arange(7)
istarts = (i_values * n2 / 2).astype(int)
for i, istart in zip(i_values, istarts):
    tChunked[i, :n2] = t[istart:istart+n2]

See also

We can see that for

t = np.arange(10000000)
n1 = 7

"extra crispy" is a lot faster than the original (91 vs 4246 ms), but only a little faster than test2() from ken's answer which is not significant considering that it does more careful checking than my brute force treatment.

However "extra crispy" avoids adding an additional axis and expanding the memory usage. For modest arrays this doesn't matter at all. But if you have a huge array then disk space and access time becomes a real problem.

"Just use Jit"?

Zaero Divide's answer points out that @jit goes a long way towards converting impractical python nested loops to fast compiled do loops.

But always check out what methods are already available in NumPy which are already running efficiently with compiled code. One can easily get a gigaflop on a laptop for problems amenable to NumPy's many handy methods.

In my case test_jit() ran four times slower than "extra crispy" (350 vs 91 ms)!

You shouldn't write script that depends on @jit to run fast if you don't need to; not everyone wants to "just use @jit".

Strangely shaped indexing

If you need to address a more random-shaped volume within an array, you can use indexing like this:

array = np.array([[0, 0, 1, 0, 0], [0, 1, 0, 1, 0], [1, 0, 0, 0, 1], [0, 1, 0, 1, 0], [0, 0, 1, 0, 0]])
print(array)

gives

[[0 0 1 0 0]
 [0 1 0 1 0]
 [1 0 0 0 1]
 [0 1 0 1 0]
 [0 0 1 0 0]]

and we can get indices for the 1's like this:

i, j =  np.where(array == 1)
print(i)
print(j)

which gives

If we want to start with a zeroed array and insert those 1's via numpy indexing, just do this

array = np.zeros((5, 5), dtype=int)
array[i, j] = 1

which recreates the original array.


import numpy as np
import matplotlib.pyplot as plt
import time

def test_original(n1, t):
    n2 = int(2*t.size / (n1 + 1))
    tChunked = np.zeros(shape = (n1, n2))
    for i in range(n1):
        istart = int(i * n2 / 2)
        for j in range(0, n2):
            tChunked[i, j] = t[istart + j]
    return  tChunked

t = np.arange(10000000)
n1 = 7

t_start = time.process_time()

tc_original = test_original(n1, t)

print('original process time (ms)', round(1000*(time.process_time() - t_start), 3))
# print('tc_original.shape: ', tc_original.shape)

fig, ax = plt.subplots(1, 1)
for thing in tc_original:
    ax.plot(thing)
plt.show()

def test_extra_crispy(n1, t):
    n2 = int(2*t.size / (n1 + 1))
    tChunked = np.zeros(shape = (n1, n2))
    i_values = np.arange(7)
    istarts = (i_values * n2 / 2).astype(int)
    for i, istart in zip(i_values, istarts):
        tChunked[i, :n2] = t[istart:istart+n2]
    return  tChunked


t_start = time.process_time()

tc_extra_crispy = test_extra_crispy(n1, t)

print('extra crispy process time (ms)', round(1000*(time.process_time() - t_start), 3))
# print('tc_extra_crispy.shape: ', tc_extra_crispy.shape)

print('np.all(tc_extra_crispy == tc_original): ', np.all(tc_extra_crispy == tc_original))

import math

def test2(n1, t): # https://stackoverflow.com/a/72492815/3904031
    n2 = int(2 * t.size / (n1 + 1))
    istart = np.linspace(0, math.ceil(n1 * n2 / 2), num=n1, endpoint=False, dtype=np.int32)
    jstart = np.linspace(0, n2, num=n2, endpoint=False, dtype=np.int32)
    k = istart[:, np.newaxis] + jstart  # Note: I switched i and j.

    tChunked = t[k]  # This creates an array of the same shape as k.

    return tChunked

t_start = time.process_time()

tc_test2 = test2(n1, t)

print('test2 process time (ms)', round(1000*(time.process_time() - t_start), 3))
# print('tc_test2.shape: ', tc_test2.shape)

print('np.all(tc_test2 == tc_original): ', np.all(tc_test2 == tc_original))

# from https://stackoverflow.com/a/72494777/3904031

from numba import njit

@njit
def test_jit(n1,t):
    n2 = int(2*t.size/(n1+1))
    print(n2)
    tChunked = np.zeros(shape = (n1,n2))
    for i in range(0,n1):
        istart = int(i*n2/2)
        for j in range(0,n2):
            tChunked[i,j] = t[istart+j]
    return tChunked


t_start = time.process_time()

tc_jit = test_jit(n1, t)

print('jit process time (ms)', round(1000*(time.process_time() - t_start), 3))
# print('tc_jit.shape: ', tc_jit.shape)

print('np.all(tc_jit == tc_original): ', np.all(tc_jit == tc_original))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文