对组上的 Expanding().mean() 进行性能调整

发布于 2025-01-11 07:17:14 字数 2108 浏览 0 评论 0原文

具有以下用户事件 DF:

    id                timestamp
0    1  2021-11-23 11:01:00.000
1    1  2021-11-23 11:02:00.000
2    1  2021-11-23 11:10:00.000
3    1  2021-11-23 11:11:00.000
4    1  2021-11-23 11:22:00.000
5    1  2021-11-23 11:40:00.000
6    1  2021-11-23 11:41:00.000
7    1  2021-11-23 11:42:00.000
8    1  2021-11-23 11:43:00.000
9    1  2021-11-23 11:44:00.000
10   2  2021-11-23 11:01:00.000
11   2  2021-11-23 11:02:00.000
12   2  2021-11-23 11:10:00.000
13   2  2021-11-23 11:11:00.000
14   2  2021-11-23 11:22:00.000
15   2  2021-11-23 11:40:00.000
16   2  2021-11-23 11:41:00.000
17   2  2021-11-23 11:42:00.000
18   2  2021-11-23 11:43:00.000
19   2  2021-11-23 11:44:00.000

我按如下方式计算每行的平均会话时间:

  1. 每个会话都是相距不到 5 分钟的事件序列。
  2. 我计算会话中的第一个事件与当前事件之间的秒数。
  3. 然后我计算每个用户的expanding().mean()。

这是我的代码:

def average_session_time(**kwargs):
    df = kwargs['df'].copy()
    df['timestamp'] = pd.to_datetime(df.timestamp)
    df['session_grp'] = df.groupby('id').apply(
        lambda x: (x.groupby([pd.Grouper(key="timestamp", freq='5min', origin='start')])).ngroup()).reset_index(
        drop=True).values.reshape(-1)

    # Getting relevant 5min groups
    ng = df.groupby(['id', 'session_grp'])
    df['fts'] = ng['timestamp'].transform('first')
    df['delta'] = df['timestamp'].sub(df['fts']).dt.total_seconds()

    return df.groupby('id')['delta'].expanding().mean().reset_index(drop=True)

输出是:

0      0.000000
1     30.000000
2     20.000000
3     15.000000
4     12.000000
5     10.000000
6      8.571429
7     15.000000
8     26.666667
9     42.000000
10     0.000000
11    30.000000
12    20.000000
13    15.000000
14    12.000000
15    10.000000
16     8.571429
17    15.000000
18    26.666667
19    42.000000
Name: delta, dtype: float64

代码工作正常,但是当它在大型数据集上运行时,性能会受到影响并且需要很长时间来计算。我尝试调整代码,但无法获得更高的性能。如何以不同的方式编写此函数以提高性能?

这是一个包含正在运行的代码的 Colab

Having the following DF of user events:

    id                timestamp
0    1  2021-11-23 11:01:00.000
1    1  2021-11-23 11:02:00.000
2    1  2021-11-23 11:10:00.000
3    1  2021-11-23 11:11:00.000
4    1  2021-11-23 11:22:00.000
5    1  2021-11-23 11:40:00.000
6    1  2021-11-23 11:41:00.000
7    1  2021-11-23 11:42:00.000
8    1  2021-11-23 11:43:00.000
9    1  2021-11-23 11:44:00.000
10   2  2021-11-23 11:01:00.000
11   2  2021-11-23 11:02:00.000
12   2  2021-11-23 11:10:00.000
13   2  2021-11-23 11:11:00.000
14   2  2021-11-23 11:22:00.000
15   2  2021-11-23 11:40:00.000
16   2  2021-11-23 11:41:00.000
17   2  2021-11-23 11:42:00.000
18   2  2021-11-23 11:43:00.000
19   2  2021-11-23 11:44:00.000

I calculate the average session time per row as follows:

  1. Each session is a sequence of events less than 5 minutes apart.
  2. I calculate the number of seconds between the first event in the session and the current event.
  3. I then calculate expanding().mean() for each user.

Here is my code:

def average_session_time(**kwargs):
    df = kwargs['df'].copy()
    df['timestamp'] = pd.to_datetime(df.timestamp)
    df['session_grp'] = df.groupby('id').apply(
        lambda x: (x.groupby([pd.Grouper(key="timestamp", freq='5min', origin='start')])).ngroup()).reset_index(
        drop=True).values.reshape(-1)

    # Getting relevant 5min groups
    ng = df.groupby(['id', 'session_grp'])
    df['fts'] = ng['timestamp'].transform('first')
    df['delta'] = df['timestamp'].sub(df['fts']).dt.total_seconds()

    return df.groupby('id')['delta'].expanding().mean().reset_index(drop=True)

And the output is:

0      0.000000
1     30.000000
2     20.000000
3     15.000000
4     12.000000
5     10.000000
6      8.571429
7     15.000000
8     26.666667
9     42.000000
10     0.000000
11    30.000000
12    20.000000
13    15.000000
14    12.000000
15    10.000000
16     8.571429
17    15.000000
18    26.666667
19    42.000000
Name: delta, dtype: float64

The code works fine, but when it's run on a large data set, the performance suffers and it takes a long time to calculate. I tried tweaking the code, but couldn't gain more performance. How can I write this function differently to improve performance?

Here is a Colab with the running code.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

追风人 2025-01-18 07:17:14

一种非常快速的解决方案是结合使用 NumpyNumba 来按照您的方式对连续行进行分组。

首先,这些列需要转换为本机 Numpy 类型,因为 CPython 对象的计算速度非常慢(并且还占用更多内存)。您可以通过以下方式执行此操作:

ids = df['Id'].values.astype('S32')
timestamps = df['timestamp'].values.astype('datetime64[ns]')

假设 ID 最多由 32 个 ASCII 字符组成。如果 ID 可以包含 unicode 字符,则可以使用 'U32' 代替(速度稍慢)。您还可以使用 np.unicode_ 让 Numpy 为您找到边界。然而,这要慢得多(因为 Numpy 需要解析所有字符串两次)。

一旦转换为 datetime64[ns],时间戳就可以转换为 64 位整数,以便 Numba 进行非常快速的计算。

然后,我们的想法是将字符串 ID 转换为基本整数,因为处理字符串非常慢。您可以通过搜索不同的相邻字符串来定位具有相同 ID 的块:

ids_int = np.insert(np.cumsum(ids[1:] != ids[:-1], dtype=np.int64), 0, 0)

请注意,在提供的数据集中,不存在与具有不同 ID 的另一行共享相同 ID 的行集。如果此假设并不总是成立,您可以使用 np.argsort(ids, kind='stable') 对输入字符串 (ids) 进行排序,应用此解决方案并然后根据 np.argsort 的输出对结果重新排序。请注意,对字符串进行排序有点慢,但仍然比问题中提供的解决方案的计算时间快得多(在我的机器上大约 100-200 毫秒)。

最后,您可以使用基本循环通过 Numba 计算结果。


完整的解决方案

这是生成的代码:

import pandas as pd
import numpy as np
import numba as nb

@nb.njit('float64[:](int64[::1], int64[::1])')
def compute_result(ids, timestamps):
    n = len(ids)
    result = np.empty(n, dtype=np.float64)
    if n == 0:
        return result
    id_group_first_timestamp = timestamps[0]
    session_group_first_timestamp = timestamps[0]
    id_group_count = 1
    id_group_delta_sum = 0.0
    last_session_group = 0
    result[0] = 0
    delay = np.int64(300e9) # 5 min (in ns)
    for i in range(1, n):
        # If there is a new group of IDs
        if ids[i-1] != ids[i]:
            id_group_first_timestamp = timestamps[i]
            id_group_delta_sum = 0.0
            id_group_count = 1
            last_session_group = 0
            session_group_first_timestamp = timestamps[i]
        else:
            id_group_count += 1
        session_group = (timestamps[i] - id_group_first_timestamp) // delay
        # If there is a new session group
        if session_group != last_session_group:
            session_group_first_timestamp = timestamps[i]
        delta = (timestamps[i] - session_group_first_timestamp) * 1e-9
        id_group_delta_sum += delta
        result[i] = id_group_delta_sum / id_group_count
        last_session_group = session_group
    return result

def fast_average_session_time(df):
    ids = df['Id'].values.astype('S32')
    timestamps = df['timestamp'].values.astype('datetime64[ns]').astype(np.int64)
    ids_int = np.insert(np.cumsum(ids[1:] != ids[:-1], dtype=np.int64), 0, 0)
    return compute_result(ids_int, timestamps)

请注意,输出是 Numpy 数组而不是数据帧,但您可以使用 pd.DataFrame(result) 从 Numpy 数组轻松构建数据帧。


基准

以下是我的机器上的最终性能:

Initial solution:            12_537 ms   (   x1.0)
Scott Boston's solution:      7_431 ms   (   x1.7)
This solution:                   64 ms   ( x195.9)

Time taken by 
compute_result only:              2.5 ms

因此,该解决方案比最初的解决方案快了近200 倍

请注意,大约85%的时间花费在很难优化的unicode/datetime字符串解析上。事实上,这种处理速度很慢,因为在现代处理器上处理短 unicode 字符串本身就很昂贵,而且 CPython 对象引入了很大的开销(例如引用计数)。此外,由于 CPython GIL 和缓慢的进程间通信,该处理无法有效地并行化。因此,这段代码肯定几乎是最佳的(只要您使用 CPython)。

One very fast solution is to use Numpy and Numba together to group the consecutive lines the way you do.

First of all, the columns needs to be converted to native Numpy types since CPython objects are very slow to compute (and also take more memory). You can do that with:

ids = df['Id'].values.astype('S32')
timestamps = df['timestamp'].values.astype('datetime64[ns]')

This assume that the IDs are composed of up to 32 ASCII characters. If IDs can contain unicode characters, you can use 'U32' instead (a bit slower). You can also use np.unicode_ to let Numpy find the bound for you. However, this is significantly slower (since Numpy need to parse all the strings twice).

Once converted to datetime64[ns], timestamps can be converted to 64-bit integers for a very fast computation by Numba.

Then, the idea is to transform string IDs to basic integers since working with strings is pretty slow. You can do that by searching for adjacent strings that are different so to locate blocks having the same ID:

ids_int = np.insert(np.cumsum(ids[1:] != ids[:-1], dtype=np.int64), 0, 0)

Note that there are no set of rows sharing the same ID with another row having a different ID in between in the provided dataset. If this assumption is not always true, you can sort the input strings (ids) using an np.argsort(ids, kind='stable'), apply this solution and then reorder the result based on the output of np.argsort. Note that sorting the strings is a bit slow but still much faster than the computation time of the solution provided in the question (about 100-200 ms on my machine).

Finally, you can compute the result with Numba using basic loops.


Complete solution

Here is the resulting code:

import pandas as pd
import numpy as np
import numba as nb

@nb.njit('float64[:](int64[::1], int64[::1])')
def compute_result(ids, timestamps):
    n = len(ids)
    result = np.empty(n, dtype=np.float64)
    if n == 0:
        return result
    id_group_first_timestamp = timestamps[0]
    session_group_first_timestamp = timestamps[0]
    id_group_count = 1
    id_group_delta_sum = 0.0
    last_session_group = 0
    result[0] = 0
    delay = np.int64(300e9) # 5 min (in ns)
    for i in range(1, n):
        # If there is a new group of IDs
        if ids[i-1] != ids[i]:
            id_group_first_timestamp = timestamps[i]
            id_group_delta_sum = 0.0
            id_group_count = 1
            last_session_group = 0
            session_group_first_timestamp = timestamps[i]
        else:
            id_group_count += 1
        session_group = (timestamps[i] - id_group_first_timestamp) // delay
        # If there is a new session group
        if session_group != last_session_group:
            session_group_first_timestamp = timestamps[i]
        delta = (timestamps[i] - session_group_first_timestamp) * 1e-9
        id_group_delta_sum += delta
        result[i] = id_group_delta_sum / id_group_count
        last_session_group = session_group
    return result

def fast_average_session_time(df):
    ids = df['Id'].values.astype('S32')
    timestamps = df['timestamp'].values.astype('datetime64[ns]').astype(np.int64)
    ids_int = np.insert(np.cumsum(ids[1:] != ids[:-1], dtype=np.int64), 0, 0)
    return compute_result(ids_int, timestamps)

Note that the output is a Numpy array and not a dataframe but you can easily build a dataframe from a Numpy array with pd.DataFrame(result).


Benchmark

Here is the resulting performance on my machine:

Initial solution:            12_537 ms   (   x1.0)
Scott Boston's solution:      7_431 ms   (   x1.7)
This solution:                   64 ms   ( x195.9)

Time taken by 
compute_result only:              2.5 ms

Thus, this solution is nearly 200 times faster than the initial one.

Note that about 85% of the time is spent in unicode/datetime string parsing that can hardly be optimized. Indeed, this processing is slow because dealing with short unicode strings is inherently expensive on modern processors and also because CPython objects introduce a significant overhead (eg. reference counting). Moreover, this processing cannot be parallelized efficiently because of the CPython GIL and slow inter-process communication. Thus, this code is certainly almost optimal (as long as you use CPython).

油焖大侠 2025-01-18 07:17:14

我可以将您的时间减少一半:

def average_session_time(**kwargs):
    df = kwargs['df'].copy()
    df['timestamp'] = pd.to_datetime(df.timestamp, format='%Y-%m-%d %H:%M:%S.%f')
    grp_id = df.groupby('Id', as_index=False)
    df['session_grp'] = grp_id.apply(
        lambda x: (x.groupby([pd.Grouper(key="timestamp", freq='5min', origin='start')])).ngroup()).values.reshape(-1)

    # Getting relevant 5min groups
    ng = df.groupby(['Id', 'session_grp'])
    df['fts'] = ng['timestamp'].transform('first')
    df['delta'] = df['timestamp'].sub(df['fts']).dt.total_seconds()

    return grp_id['delta'].expanding().mean().reset_index(level=0, drop=True)

我认为通过使用 pd.to_datetime 中的格式并在 groupby 中使用 as_index 参数而不是调用rest_index:原始计时,

40.228641986846924

新时间:

16.08320665359497

I think I was able to cut your time in half by using format in pd.to_datetime and also using as_index parameter in groupby instead of calling rest_index:

def average_session_time(**kwargs):
    df = kwargs['df'].copy()
    df['timestamp'] = pd.to_datetime(df.timestamp, format='%Y-%m-%d %H:%M:%S.%f')
    grp_id = df.groupby('Id', as_index=False)
    df['session_grp'] = grp_id.apply(
        lambda x: (x.groupby([pd.Grouper(key="timestamp", freq='5min', origin='start')])).ngroup()).values.reshape(-1)

    # Getting relevant 5min groups
    ng = df.groupby(['Id', 'session_grp'])
    df['fts'] = ng['timestamp'].transform('first')
    df['delta'] = df['timestamp'].sub(df['fts']).dt.total_seconds()

    return grp_id['delta'].expanding().mean().reset_index(level=0, drop=True)

Original Timing:

40.228641986846924

New Timing:

16.08320665359497

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文