Python 线程与 Pandas Dataframe 并不能提高性能

发布于 2025-01-12 15:42:50 字数 732 浏览 0 评论 0原文

我有一个 200k 行的 Dataframe,我想分成几个部分并为每个分区调用我的函数 S_Function。

def S_Function(df):
    #mycode here 
    return new_df

主程序

N_Threads = 10
Threads = []
Out = []

size = df.shape[0] // N_Threads

for i in range(N_Threads + 1):

    begin = i * size
    end = min(df.shape[0], (i+1)*size)
    Threads.append(Thread(target = S_Function, args = (df[begin:end])) )

我运行线程 & make the join :

for i in range(N_Threads + 1):
    Threads[i].start()

for i in range(N_Threads + 1):
    Out.append(Threads[i].join())

output = pd.concat(Out)

代码工作正常,但问题是使用 threading.Thread 并没有减少执行时间。
序列代码:16 分钟
并行代码:15 分钟

有人可以解释一下需要改进什么吗?为什么效果不好?

i have a Dataframe of 200k lines, i want to split into parts and call my function S_Function for each partition.

def S_Function(df):
    #mycode here 
    return new_df

Main program

N_Threads = 10
Threads = []
Out = []

size = df.shape[0] // N_Threads

for i in range(N_Threads + 1):

    begin = i * size
    end = min(df.shape[0], (i+1)*size)
    Threads.append(Thread(target = S_Function, args = (df[begin:end])) )

I run the threads & make the join :

for i in range(N_Threads + 1):
    Threads[i].start()

for i in range(N_Threads + 1):
    Out.append(Threads[i].join())

output = pd.concat(Out)

The code is working perfectly but the problem is that using threading.Thread did not decrease the execution time.

Sequential Code : 16 minutes
Parallel Code : 15 minutes

Can someone explain what to improve, why this is not working well?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

卷耳 2025-01-19 15:42:50

当您必须处理 CPU 密集型操作时,请勿使用线程。为了实现你的目标,我认为你应该使用 multiprocessing 模块

尝试:

import pandas as pd
import numpy as np
import multiprocessing
import time
import functools

# Modify here
CHUNKSIZE = 20000

def S_Function(df, dictionnary):
    # do stuff here
    new_df = df
    return new_df


if __name__ == '__main__':
    # Load your dataframe
    df = pd.DataFrame({'A': np.random.randint(1, 30000000, 200000).tolist()})

    # Create chunks to process
    chunks = (df[i:i+CHUNKSIZE] for i in range(0, len(df), CHUNKSIZE))
    dictionnary = {'k1': 'v1', 'k2': 'v2'}
    s_func = functools.partial(S_Function, dictionnary=dictionnary)

    start = time.time()
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        data = pool.map(s_func, chunks)
        out = pd.concat(data)
    end = time.time()

    print(f"Elapsed time: {end - start:.2f} seconds")

Don't use threading when you have to process CPU-bound operations. To achieve your goal, I think you should use multiprocessing module

Try:

import pandas as pd
import numpy as np
import multiprocessing
import time
import functools

# Modify here
CHUNKSIZE = 20000

def S_Function(df, dictionnary):
    # do stuff here
    new_df = df
    return new_df


if __name__ == '__main__':
    # Load your dataframe
    df = pd.DataFrame({'A': np.random.randint(1, 30000000, 200000).tolist()})

    # Create chunks to process
    chunks = (df[i:i+CHUNKSIZE] for i in range(0, len(df), CHUNKSIZE))
    dictionnary = {'k1': 'v1', 'k2': 'v2'}
    s_func = functools.partial(S_Function, dictionnary=dictionnary)

    start = time.time()
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        data = pool.map(s_func, chunks)
        out = pd.concat(data)
    end = time.time()

    print(f"Elapsed time: {end - start:.2f} seconds")
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文