可以编写这些to_csv函数以使它们同时将数据从数据框架加载到CSV?

发布于 2025-02-12 11:14:56 字数 739 浏览 0 评论 0原文

if len(isd) != 0:
    isd.to_csv("Issuedate.csv")
if len(ind) != 0:
    ind.to_csv("Inceptiondatecsv")
if len(exd) != 0:
    exd.to_csv("Expirydate.csv")
if len(psd) != 0:
    psd.to_csv("policystatedate.csv")
if len(visd) != 0:
    visd.to_csv("vehicleissuedate.csv")
if len(vind) != 0:
    vind.to_csv("vehicleinceptiondate.csv")
if len(vexd) != 0:
    vexd.to_csv("vehicleexpirydate.csv")
if len(sd) != 0:
    sd.to_csv("statusdate.csv")
if len(ise) != 0:
    ise.to_csv("istemarhexpiry.csv")
if len(idb) != 0:
    idb.to_csv("insureddateofbirth.csv")
if len(mdd) != 0:
    mdd.to_csv("maindriverdob.csv")
if len(add) != 0:
    add.to_csv("adddriverdob.csv")

可以使用多线程或多处理可以加速这些。我是Python的新手,想同时将数据加载到CSV文件。

if len(isd) != 0:
    isd.to_csv("Issuedate.csv")
if len(ind) != 0:
    ind.to_csv("Inceptiondatecsv")
if len(exd) != 0:
    exd.to_csv("Expirydate.csv")
if len(psd) != 0:
    psd.to_csv("policystatedate.csv")
if len(visd) != 0:
    visd.to_csv("vehicleissuedate.csv")
if len(vind) != 0:
    vind.to_csv("vehicleinceptiondate.csv")
if len(vexd) != 0:
    vexd.to_csv("vehicleexpirydate.csv")
if len(sd) != 0:
    sd.to_csv("statusdate.csv")
if len(ise) != 0:
    ise.to_csv("istemarhexpiry.csv")
if len(idb) != 0:
    idb.to_csv("insureddateofbirth.csv")
if len(mdd) != 0:
    mdd.to_csv("maindriverdob.csv")
if len(add) != 0:
    add.to_csv("adddriverdob.csv")

Can These can be sped up using multithreading or multiprocessing. I'm pretty new to python and want to load data to csv file concurrently.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

冰葑 2025-02-19 11:14:57

有几个问题。我找不到有关to_csv dataFrame方法是否释放GIL的任何文档。如果我们知道它确实如此,那么我们将要使用一个多线程池,因为我们仍然会进行并行处理,并且我们不会遭受通过(大型?)数据帧从主进程的地址空间传达给池的子过程的(大型?)数据框地址空间(通过泡菜)。但是,无论我们是使用多线程或多处理,您都将尝试并行进行多个文件创建。取决于您是否具有固态驱动器以及其特征是什么,由于头部移动过多,这可能会适得其反,并且您只能进行I/O操作的有限带宽。因此,假设我们必须使用更昂贵的多处理池进行处理,则根本不清楚如果有的话将得到多少性能。

但这就是您可以做到的(函数main1)。我还添加了函数main2,该功能串行进行相同的处理,并且我两次都比较。我只有两个微不足道的数据范围和一个非固定状态驱动器,因此所得数字可能与您的情况有很大不同:

def save_df(df, csv_name):
    df.to_csv(csv_name)


def main1():
    import time
    import pandas as pd
    from multiprocessing import Pool, cpu_count

    df1 = pd.DataFrame({
        'Name': ['Tom', 'Dick', 'Harry', 'Jane'],
        'Age': [10, 20, 30, 39],
        'Sex': ['M', 'M', 'M', 'F']
    })

    df2 = pd.DataFrame({
        'Name': ['X', 'Y', 'Z'],
        'Age': [10, 20, 30],
        'Sex': ['F', 'M', 'F']
    })

    # Somehow we have a list of tuples consisting of dataframe, csv-name pairs
    dfs = [(df1, 'df1.csv'), (df2, 'df2.csv')]

    # Filter these so that we do not unproductively submit 0-length dataframes:
    filtered_dfs = list(filter(lambda t: len(t[0]), dfs))
    pool_size = min(len(filtered_dfs), cpu_count())
    t = time.time()
    with Pool(pool_size) as pool:
        pool.starmap(save_df, filtered_dfs)
    print(time.time() - t)

def main2():
    import time
    import pandas as pd

    df1 = pd.DataFrame({
        'Name': ['Tom', 'Dick', 'Harry', 'Jane'],
        'Age': [10, 20, 30, 39],
        'Sex': ['M', 'M', 'M', 'F']
    })

    df2 = pd.DataFrame({
        'Name': ['X', 'Y', 'Z'],
        'Age': [10, 20, 30],
        'Sex': ['F', 'M', 'F']
    })

    # Somehow we have a list of tuples consisting of dataframe, csv-name pairs
    dfs = [(df1, 'df1.csv'), (df2, 'df2.csv')]

    # Filter these so that we do not unproductively submit 0-length dataframes:
    filtered_dfs = list(filter(lambda t: len(t[0]), dfs))
    t = time.time()
    for df, csv in filtered_dfs:
        save_df(df, csv)
    print(time.time() - t)

if __name__ == '__main__':
    main1()
    main2()

印刷:

0.6069865226745605
0.002997159957885742

由于我们使用的多处理与save_df 此类小型数据框架的工作功能。由于需要通过save_df进行的工作随着较大的数据框而增加,因此多处理可能开始看起来更好。

There are several issues. I could not find any documentation as to whether the to_csv dataframe method releases the GIL. If we knew that it did, then we would want to use a multithreading pool since we would still get parallel processing and we would not suffer the overhead resulting from passing a (large?) dataframe from the main process's address space to the pool's child process's address space (via pickle). But regardless of whether we are using multithreading or multiprocessing, you would be attempting to do multiple file creations in parallel. Depending on whether you have a solid state drive or not and what its characteristics are, this could be counterproductive due to excessive head movement and you only have a certain limited bandwidth for doing I/O operations. So assuming we have to do the processing with the more costly multiprocessing pool, it is not at all clear how much performance will be improved if at all.

But this is how you might do it (function main1). I have also added function main2 which does the same processing serially and I compare both times. I only have two trivial dataframes and a non-solid state drive, so the resulting numbers may be quite different from your case:

def save_df(df, csv_name):
    df.to_csv(csv_name)


def main1():
    import time
    import pandas as pd
    from multiprocessing import Pool, cpu_count

    df1 = pd.DataFrame({
        'Name': ['Tom', 'Dick', 'Harry', 'Jane'],
        'Age': [10, 20, 30, 39],
        'Sex': ['M', 'M', 'M', 'F']
    })

    df2 = pd.DataFrame({
        'Name': ['X', 'Y', 'Z'],
        'Age': [10, 20, 30],
        'Sex': ['F', 'M', 'F']
    })

    # Somehow we have a list of tuples consisting of dataframe, csv-name pairs
    dfs = [(df1, 'df1.csv'), (df2, 'df2.csv')]

    # Filter these so that we do not unproductively submit 0-length dataframes:
    filtered_dfs = list(filter(lambda t: len(t[0]), dfs))
    pool_size = min(len(filtered_dfs), cpu_count())
    t = time.time()
    with Pool(pool_size) as pool:
        pool.starmap(save_df, filtered_dfs)
    print(time.time() - t)

def main2():
    import time
    import pandas as pd

    df1 = pd.DataFrame({
        'Name': ['Tom', 'Dick', 'Harry', 'Jane'],
        'Age': [10, 20, 30, 39],
        'Sex': ['M', 'M', 'M', 'F']
    })

    df2 = pd.DataFrame({
        'Name': ['X', 'Y', 'Z'],
        'Age': [10, 20, 30],
        'Sex': ['F', 'M', 'F']
    })

    # Somehow we have a list of tuples consisting of dataframe, csv-name pairs
    dfs = [(df1, 'df1.csv'), (df2, 'df2.csv')]

    # Filter these so that we do not unproductively submit 0-length dataframes:
    filtered_dfs = list(filter(lambda t: len(t[0]), dfs))
    t = time.time()
    for df, csv in filtered_dfs:
        save_df(df, csv)
    print(time.time() - t)

if __name__ == '__main__':
    main1()
    main2()

Prints:

0.6069865226745605
0.002997159957885742

Serial processing is much more performant due to the overhead due to our using multiprocessing compared with the trivialness the save_df worker function for such small dataframes. As the work required to be done by save_df increases with larger dataframes, then multiprocessing might start looking better.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文