创建和合并多个数据集不适合内存,请使用DASK?

发布于 2025-02-13 02:49:27 字数 7407 浏览 1 评论 0原文

我不太确定如何问这个问题,但是我需要一些澄清,以了解如何利用dask的“处理不适合记忆的数据集”的能力,因为我对它的工作方式有些困惑这些数据集的创建。

我在下面制作了可再现的代码,以密切模仿我的问题。尽管此示例确实适合我的16GB内存,但我们可以假设不是因为它确实占用了我的所有RAM。

我正在与1分钟,5分钟,15分钟和每日股票市场数据集合作,所有这些数据集都有自己的技术指标,因此这些单独的数据框架中的每一个均为234列,其中1分钟数据集的行最多(521,811),并且从那里下来。这些数据集都可以创建并单独适合内存,但这是棘手的地方。

我试图将它们的列合并为1个数据框,每列都与它们各自的时间范围进行了预先准备,以便我可以分开,但这会产生内存问题。这就是我想在视觉上完成的工作:

”

我不确定Dask是否是我在这里需要的,但是我假设 所以。我不想在这里使用任何类型的“并行计算”(现在),我只需要一种方法来创建此数据框架,然后再将其馈送到机器学习管道中(是的,我知道这是一个股票市场问题,只需忽略该数据即可目前)。我知道Dask有一个可以使用的机器学习管道,因此也许将来我会使用它,但是我需要一种方法来将此大数据框架保存到磁盘上,或者在即时直接导入它后创建它。

我需要帮助的是如何做到这一点。看到这些数据集都很好地适合内存,我有一个想法(这可能根本不正确,所以请让我知道),将保存每个数据范围以将Parquet文件分开为磁盘,然后当我开始启动机器学习管道时,创建一个DASK数据框对象将它们导入其中的每个对象。像这样:

“

这在概念上是正确的,还是我要做的事情是正确的?哈哈。我已经阅读了DASK上的文档,还签出了本指南具体来说,这很好,但是作为新手,我第一次需要一些指导。

如果我不能首先在内存中创建它,如何创建并将这个大的合并数据框架创建到磁盘上?

这是我可重现的数据框/内存问题代码。当您运行此操作时,请小心,因为它会很快吃掉您的公羊,我有16GB的RAM,它确实在我的相当轻的机器上运行,但并非没有红色的RAM,只是想给Dask Gods那里有特定的工作。谢谢!

from pandas import DataFrame, date_range, merge
from numpy import random

# ------------------------------------------------------------------------------------------------ #
#                                         1 MINUTE DATASET                                         #
# ------------------------------------------------------------------------------------------------ #
ONE_MIN_NUM_OF_ROWS = 521811
ONE_MIN_NUM_OF_COLS = 234
main_df = DataFrame(random.randint(0,100, size=(ONE_MIN_NUM_OF_ROWS, ONE_MIN_NUM_OF_COLS)), 
                    columns=list("col_" + str(x) for x in range(ONE_MIN_NUM_OF_COLS)),
                    index=date_range(start="2019-12-09 04:00:00", freq="min", periods=ONE_MIN_NUM_OF_ROWS))


# ------------------------------------------------------------------------------------------------ #
#                                         5 MINUTE DATASET                                         #
# ------------------------------------------------------------------------------------------------ #
FIVE_MIN_NUM_OF_ROWS = 117732
FIVE_MIN_NUM_OF_COLS = 234
five_min_df = DataFrame(random.randint(0,100, size=(FIVE_MIN_NUM_OF_ROWS, FIVE_MIN_NUM_OF_COLS)), 
                    columns=list("5_min_col_" + str(x) for x in range(FIVE_MIN_NUM_OF_COLS)),
                    index=date_range(start="2019-12-09 04:00:00", freq="5min", periods=FIVE_MIN_NUM_OF_ROWS))
# Merge the 5 minute to the 1 minute df
main_df = merge(main_df, five_min_df, how="outer", left_index=True, right_index=True, sort=True)


# ------------------------------------------------------------------------------------------------ #
#                                         15 MINUTE DATASET                                        #
# ------------------------------------------------------------------------------------------------ #
FIFTEEN_MIN_NUM_OF_ROWS = 117732
FIFTEEN_MIN_NUM_OF_COLS = 234
fifteen_min_df = DataFrame(random.randint(0,100, size=(FIFTEEN_MIN_NUM_OF_ROWS, FIFTEEN_MIN_NUM_OF_COLS)), 
                    columns=list("15_min_col_" + str(x) for x in range(FIFTEEN_MIN_NUM_OF_COLS)),
                    index=date_range(start="2019-12-09 04:00:00", freq="15min", periods=FIFTEEN_MIN_NUM_OF_ROWS))
# Merge the 15 minute to the main df
main_df = merge(main_df, fifteen_min_df, how="outer", left_index=True, right_index=True, sort=True)


# ------------------------------------------------------------------------------------------------ #
#                                           DAILY DATASET                                          #
# ------------------------------------------------------------------------------------------------ #
DAILY_NUM_OF_ROWS = 933
DAILY_NUM_OF_COLS = 234
fifteen_min_df = DataFrame(random.randint(0,100, size=(DAILY_NUM_OF_ROWS, DAILY_NUM_OF_COLS)), 
                    columns=list("daily_col_" + str(x) for x in range(DAILY_NUM_OF_COLS)),
                    index=date_range(start="2019-12-09 04:00:00", freq="D", periods=DAILY_NUM_OF_ROWS))
# Merge the daily to the main df (don't worry about "forward peaking" dates)
main_df = merge(main_df, fifteen_min_df, how="outer", left_index=True, right_index=True, sort=True)


# ------------------------------------------------------------------------------------------------ #
#                                            FFILL NAN's                                           #
# ------------------------------------------------------------------------------------------------ #
main_df = main_df.fillna(method="ffill")

# ------------------------------------------------------------------------------------------------ #
#                                              INSPECT                                             #
# ------------------------------------------------------------------------------------------------ #
print(main_df)

更新

感谢下面的最高答案,我越来越接近解决方案。

我已经在代码中修复了一些语法错误,并有一个工作示例,直到每日时间表。当我使用1B时间范围以将其升级到工作日时,错误是:

valueerror:< businessday>是一个非固定频率

我认为它与这一行有关:

data_index = reamival_resolution_index.floor(data_freq).drop_duplicates()

...追溯。我不认为大熊猫喜欢1B时间范围和floaining()函数,那么是否有替代方案?

我也需要在其中有每日数据,但是代码适用于其他所有时间表。一旦我弄清楚了这件事,我就可以将其应用于我的用例。

谢谢!

from pandas import DataFrame, concat, date_range
from numpy import random
import dask.dataframe as dd, dask.delayed

ROW_CHUNK_SIZE = 5000

def load_data_subset(start_date, freq, data_freq, hf_periods):
    higher_resolution_index = date_range(start_date, freq=freq, periods=hf_periods)
    data_index = higher_resolution_index.floor(data_freq).drop_duplicates()
    dummy_response = DataFrame(
        random.randint(0, 100, size=(len(data_index), 234)),
        columns=list(
            f"{data_freq}_col_" + str(x) for x in range(234)
        ),
        index=data_index
    )
    dummy_response = dummy_response.loc[higher_resolution_index.floor(data_freq)].set_axis(higher_resolution_index)
    return dummy_response

@dask.delayed
def load_all_columns_for_subset(start_date, freq, hf_periods):
    return concat(
        [
            load_data_subset(start_date, freq, "1min", hf_periods),
            load_data_subset(start_date, freq, "5min", hf_periods),
            load_data_subset(start_date, freq, "15min", hf_periods),
            load_data_subset(start_date, freq, "1H", hf_periods),
            load_data_subset(start_date, freq, "4H", hf_periods),
            load_data_subset(start_date, freq, "1B", hf_periods),
        ],
        axis=1,
    )

ONE_MIN_NUM_OF_ROWS = 521811
full_index = date_range(
    start="2019-12-09 04:00:00",
    freq="1min",
    periods=ONE_MIN_NUM_OF_ROWS,
)

df = dask.dataframe.from_delayed([load_all_columns_for_subset(full_index[i], freq="1min", hf_periods=ROW_CHUNK_SIZE) for i in range(0, ONE_MIN_NUM_OF_ROWS, ROW_CHUNK_SIZE)])

# Save df to parquet here when ready

I'm not quite sure how to ask this question, but I need some clarification on how to make use of Dask's ability to "handle datasets that don't fit into memory", because I'm a little confused on how it works from the CREATION of these datasets.

I have made a reproducible code below that closely emulates my problem. Although this example DOES fit into my 16Gb memory, we can assume that it doesn't because it does take up ALMOST all of my RAM.

I'm working with 1min, 5min, 15min and Daily stock market datasets, all of which have their own technical indicators, so each of these separate dataframes are 234 columns in width, with the 1min dataset having the most rows (521,811), and going down from there. Each of these datasets can be created and fit into memory on their own, but here's where it gets tricky.

I'm trying to merge them column-wise into 1 dataframe, each column prepended with their respective timeframes so I can tell them apart, but this creates the memory problem. This is what I'm looking to accomplish visually:

DesiredOutcome

I'm not really sure if Dask is what I need here, but I assume so. I'm NOT looking to use any kind of "parallel calculations" here (yet), I just need a way to create this dataframe before feeding it into a machine learning pipeline (yes, I know it's a stock market problem, just overlook that for now). I know Dask has a machine learning pipeline I can use, so maybe I'll make use of that in the future, however I need a way to save this big dataframe to disk, or create it upon importing it on the fly.

What I need help with is how to do this. Seeing as each of these datasets on their own fit into memory nicely, an idea I had (and this may not be correct at all so please let me know), would be to save each of the dataframes to separate parquet files to disk, then create a Dask dataframe object to import each of them into, when I go to start the machine learning pipeline. Something like this:

Idea1

Is this conceptually correct with what I need to do, or am I way off? haha. I've read through the documentation on Dask, and also checked out this guide specifically, which is good, however as a newbie I need some guidance with how to do this for the first time.

How can I create and save this big merged dataframe to disk, if I can't create it in memory in the first place?

Here is my reproducible dataframe/memory problem code. Be careful when you go to run this as it'll eat up your RAM pretty quickly, I have 16Gb of RAM and it does run on my fairly light machine, but not without some red-lining RAM, just wanted to give the Dask gods out there something specific to work with. Thanks!

from pandas import DataFrame, date_range, merge
from numpy import random

# ------------------------------------------------------------------------------------------------ #
#                                         1 MINUTE DATASET                                         #
# ------------------------------------------------------------------------------------------------ #
ONE_MIN_NUM_OF_ROWS = 521811
ONE_MIN_NUM_OF_COLS = 234
main_df = DataFrame(random.randint(0,100, size=(ONE_MIN_NUM_OF_ROWS, ONE_MIN_NUM_OF_COLS)), 
                    columns=list("col_" + str(x) for x in range(ONE_MIN_NUM_OF_COLS)),
                    index=date_range(start="2019-12-09 04:00:00", freq="min", periods=ONE_MIN_NUM_OF_ROWS))


# ------------------------------------------------------------------------------------------------ #
#                                         5 MINUTE DATASET                                         #
# ------------------------------------------------------------------------------------------------ #
FIVE_MIN_NUM_OF_ROWS = 117732
FIVE_MIN_NUM_OF_COLS = 234
five_min_df = DataFrame(random.randint(0,100, size=(FIVE_MIN_NUM_OF_ROWS, FIVE_MIN_NUM_OF_COLS)), 
                    columns=list("5_min_col_" + str(x) for x in range(FIVE_MIN_NUM_OF_COLS)),
                    index=date_range(start="2019-12-09 04:00:00", freq="5min", periods=FIVE_MIN_NUM_OF_ROWS))
# Merge the 5 minute to the 1 minute df
main_df = merge(main_df, five_min_df, how="outer", left_index=True, right_index=True, sort=True)


# ------------------------------------------------------------------------------------------------ #
#                                         15 MINUTE DATASET                                        #
# ------------------------------------------------------------------------------------------------ #
FIFTEEN_MIN_NUM_OF_ROWS = 117732
FIFTEEN_MIN_NUM_OF_COLS = 234
fifteen_min_df = DataFrame(random.randint(0,100, size=(FIFTEEN_MIN_NUM_OF_ROWS, FIFTEEN_MIN_NUM_OF_COLS)), 
                    columns=list("15_min_col_" + str(x) for x in range(FIFTEEN_MIN_NUM_OF_COLS)),
                    index=date_range(start="2019-12-09 04:00:00", freq="15min", periods=FIFTEEN_MIN_NUM_OF_ROWS))
# Merge the 15 minute to the main df
main_df = merge(main_df, fifteen_min_df, how="outer", left_index=True, right_index=True, sort=True)


# ------------------------------------------------------------------------------------------------ #
#                                           DAILY DATASET                                          #
# ------------------------------------------------------------------------------------------------ #
DAILY_NUM_OF_ROWS = 933
DAILY_NUM_OF_COLS = 234
fifteen_min_df = DataFrame(random.randint(0,100, size=(DAILY_NUM_OF_ROWS, DAILY_NUM_OF_COLS)), 
                    columns=list("daily_col_" + str(x) for x in range(DAILY_NUM_OF_COLS)),
                    index=date_range(start="2019-12-09 04:00:00", freq="D", periods=DAILY_NUM_OF_ROWS))
# Merge the daily to the main df (don't worry about "forward peaking" dates)
main_df = merge(main_df, fifteen_min_df, how="outer", left_index=True, right_index=True, sort=True)


# ------------------------------------------------------------------------------------------------ #
#                                            FFILL NAN's                                           #
# ------------------------------------------------------------------------------------------------ #
main_df = main_df.fillna(method="ffill")

# ------------------------------------------------------------------------------------------------ #
#                                              INSPECT                                             #
# ------------------------------------------------------------------------------------------------ #
print(main_df)

UPDATE

Thanks to the top answer below, I'm getting closer to my solution.

I've fixed a few syntax errors in the code, and have a working example, UP TO the daily timeframe. When I use the 1B timeframe for upsampling to business days, the error is:

ValueError: <BusinessDay> is a non-fixed frequency

I think it has something to do with this line:

data_index = higher_resolution_index.floor(data_freq).drop_duplicates()

...as that's what I see in the traceback. I don't think Pandas likes the 1B timeframe and the floor() function, so is there an alternative?

I need to have daily data in there too, however the code works for every other timeframe. Once I can get this daily thing figured out, I'll be able to apply it to my use case.

Thanks!

from pandas import DataFrame, concat, date_range
from numpy import random
import dask.dataframe as dd, dask.delayed

ROW_CHUNK_SIZE = 5000

def load_data_subset(start_date, freq, data_freq, hf_periods):
    higher_resolution_index = date_range(start_date, freq=freq, periods=hf_periods)
    data_index = higher_resolution_index.floor(data_freq).drop_duplicates()
    dummy_response = DataFrame(
        random.randint(0, 100, size=(len(data_index), 234)),
        columns=list(
            f"{data_freq}_col_" + str(x) for x in range(234)
        ),
        index=data_index
    )
    dummy_response = dummy_response.loc[higher_resolution_index.floor(data_freq)].set_axis(higher_resolution_index)
    return dummy_response

@dask.delayed
def load_all_columns_for_subset(start_date, freq, hf_periods):
    return concat(
        [
            load_data_subset(start_date, freq, "1min", hf_periods),
            load_data_subset(start_date, freq, "5min", hf_periods),
            load_data_subset(start_date, freq, "15min", hf_periods),
            load_data_subset(start_date, freq, "1H", hf_periods),
            load_data_subset(start_date, freq, "4H", hf_periods),
            load_data_subset(start_date, freq, "1B", hf_periods),
        ],
        axis=1,
    )

ONE_MIN_NUM_OF_ROWS = 521811
full_index = date_range(
    start="2019-12-09 04:00:00",
    freq="1min",
    periods=ONE_MIN_NUM_OF_ROWS,
)

df = dask.dataframe.from_delayed([load_all_columns_for_subset(full_index[i], freq="1min", hf_periods=ROW_CHUNK_SIZE) for i in range(0, ONE_MIN_NUM_OF_ROWS, ROW_CHUNK_SIZE)])

# Save df to parquet here when ready

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

半夏半凉 2025-02-20 02:49:27

我会接受 dask.dask.dataframe教程 href =“ https://docs.dask.org/en/latest/dataframe-best-practices.html” rel =“ nofollow noreferrer”> dataframe最佳实践指南。 DASK通常可以通过两种方法之一来与大型内存数据集一起使用:

  1. 提前设计您的作业,然后通过数据分区进行迭代,在您进行时编写输出,因此并非所有数据都不是所有数据同时在记忆中。

  2. 使用分布式群集比在任何一台计算机上都有更多的(分布式)内存。

听起来您正在寻找方法(1)。实际的实现将取决于您如何访问/生成数据,但通常我会说您不应该将作业视为“在内存中生成大型内存数据集,然后将其转储到DASK数据帧中”。相反,您需要仔细考虑如何逐个分区加载数据分区,以便每个分区可以独立工作。

修改您的示例,完整的工作流程可能看起来像这样:

import pandas as pd, numpy as np, dask.dataframe, dask.delayed

@dask.delayed
def load_data_subset(start_date, freq, periods):
    # presumably, you'd query some API or something here
    dummy_ind = pd.date_range(start_date, freq=freq, periods=periods)
    dummy_response = pd.DataFrame(
        np.random.randint(0, 100, size=(len(dummy_ind), 234)),
        columns=list("daily_col_" + str(x) for x in range(234)),
        index=dummy_ind
    )
    return dummy_response

# generate a partitioned dataset with a configurable frequency, with each dataframe having a consistent number of rows.
FIFTEEN_MIN_NUM_OF_ROWS = 117732
full_index = pd.date_range(
    start="2019-12-09 04:00:00",
    freq="15min",
    periods=FIFTEEN_MIN_NUM_OF_ROWS,
)
df_15min = dask.dataframe.from_delayed([
    load_data_subset(full_index[i], freq="15min", periods=10000)
    for i in range(0, FIFTEEN_MIN_NUM_OF_ROWS, 10000)
])

您现在可以将其写入磁盘,con依,等等,在任何给定的点,每个Dask Worker都只能一次使用10,000行。理想情况下,您将设计块,因此每个分区每个分区都有几百MB-请参阅分区尺寸

这可以扩展到包括这样的多个频率:

import pandas as pd, numpy as np, dask.dataframe, dask.delayed

def load_data_subset(start_date, freq, data_freq, hf_periods):
    # here's your 1min time series *for this partition*
    high_res_ind = pd.date_range(start_date, freq=freq, periods=hf_periods)
    # here's your lower frequency (e.g. 1H, 1day) index 
    # for the same period
    data_ind = high_res_ind.floor(data_freq).drop_duplicates()

    # presumably, you'd query some API or something here. 
    # Alternatively, you could read subsets of your pre-generated 
    # frequency files. this covers the same dates as the 1 minute 
    # dataset, but only has the number of periods in the lower-res
    # time series
    dummy_response = pd.DataFrame(
        np.random.randint(0, 100, size=(len(data_ind), 234)),
        columns=list(
            f"{data_freq}_col_" + str(x) for x in range(234)
        ),
        index=data_ind
    )

    # now, reindex to the shape of the new data (this does the
    # forward fill step):
    dummy_response = (
        dummy_response
        .loc[high_res_ind.floor(data_freq)]
        .set_axis(high_res_ind)
    )

    return dummy_response

@dask.delayed
def load_all_columns_for_subset(start_date, periods):
    return pd.concat(
        [
            load_data_subset(start_date, "1min", "1min", periods),
            load_data_subset(start_date, "1min", "5min", periods),
            load_data_subset(start_date, "1min", "15min", periods),
            load_data_subset(start_date, "1min", "D", periods),
        ],
        axis=1,
    )

# generate a partitioned dataset with all columns, where lower 
# frequency columns have been ffilled, with each dataframe having
# a consistent number of rows.
ONE_MIN_NUM_OF_ROWS = 521811
full_index = pd.date_range(
    start="2019-12-09 04:00:00",
    freq="1min",
    periods=ONE_MIN_NUM_OF_ROWS,
)
df_full = dask.dataframe.from_delayed([
    load_all_columns_for_subset(full_index[i], periods=10000)
    for i in range(0, ONE_MIN_NUM_OF_ROWS, 10000)
])

这对我来说直接运行。如果您调用df_full.to_parquet(filepath),它也可以很好地导出完整的数据框架。我用dask.dask.distributed调度程序(在我的笔记本电脑上运行)进行了运行,并密切关注仪表板,总内存不超过3.5GB。

因为dask.dataframe预览有很多列有点不归类,但这是头和尾巴:

In [10]: df_full.head()
Out[10]:
                     1min_col_0  1min_col_1  1min_col_2  1min_col_3  1min_col_4  1min_col_5  1min_col_6  1min_col_7  ...  D_col_226  D_col_227  D_col_228  D_col_229  D_col_230  D_col_231  D_col_232  D_col_233
2019-12-09 04:00:00          88          36          34          57          54          98           4          92  ...         84          3         49         29         62         47         21         21
2019-12-09 04:01:00          89          61          50           2          73          44          49          33  ...         84          3         49         29         62         47         21         21
2019-12-09 04:02:00           9          18          73          76          28          17          10          49  ...         84          3         49         29         62         47         21         21
2019-12-09 04:03:00          59          73          92          28          32           8          24          85  ...         84          3         49         29         62         47         21         21
2019-12-09 04:04:00          40          54          23           5          52          63          61          64  ...         84          3         49         29         62         47         21         21

[5 rows x 936 columns]

In [11]: df_full.tail()
Out[11]:
                     1min_col_0  1min_col_1  1min_col_2  1min_col_3  1min_col_4  1min_col_5  1min_col_6  1min_col_7  ...  D_col_226  D_col_227  D_col_228  D_col_229  D_col_230  D_col_231  D_col_232  D_col_233
2020-12-11 05:15:00          81           8          51           2          77          26          66          23  ...         15         51         66         26         88         85         91         65
2020-12-11 05:16:00          67          68          34          58          43          40          76          72  ...         15         51         66         26         88         85         91         65
2020-12-11 05:17:00          93          66          21          39          12          96          53           4  ...         15         51         66         26         88         85         91         65
2020-12-11 05:18:00          69           9          69          41           5           6           6          37  ...         15         51         66         26         88         85         91         65
2020-12-11 05:19:00          18          50          25          74          78          51          10          83  ...         15         51         66         26         88         85         91         65

[5 rows x 936 columns]

I would take the dask.dataframe tutorial and look at the dataframe best practices guide. dask can work with larger-than-memory datasets generally by one of two approaches:

  1. design your job ahead of time, then iterate through partitions of the data, writing the outputs as you go, so that not all of the data is in memory at the same time.

  2. use a distributed cluster to leverage more (distributed) memory than exists on any one machine.

It sounds like you're looking for approach (1). The actual implementation will depend on how you access/generate the data, but generally I'd say you should not think of the job as "generate the larger-than-memory dataset in memory then dump it into the dask dataframe". Instead, you'll need to think carefully about how to load the data partition-by-partition, so that each partition can work independently.

Modifying your example, the full workflow might look something like this:

import pandas as pd, numpy as np, dask.dataframe, dask.delayed

@dask.delayed
def load_data_subset(start_date, freq, periods):
    # presumably, you'd query some API or something here
    dummy_ind = pd.date_range(start_date, freq=freq, periods=periods)
    dummy_response = pd.DataFrame(
        np.random.randint(0, 100, size=(len(dummy_ind), 234)),
        columns=list("daily_col_" + str(x) for x in range(234)),
        index=dummy_ind
    )
    return dummy_response

# generate a partitioned dataset with a configurable frequency, with each dataframe having a consistent number of rows.
FIFTEEN_MIN_NUM_OF_ROWS = 117732
full_index = pd.date_range(
    start="2019-12-09 04:00:00",
    freq="15min",
    periods=FIFTEEN_MIN_NUM_OF_ROWS,
)
df_15min = dask.dataframe.from_delayed([
    load_data_subset(full_index[i], freq="15min", periods=10000)
    for i in range(0, FIFTEEN_MIN_NUM_OF_ROWS, 10000)
])

You could now write these to disk, concat, etc, and at any given point, each dask worker will only be working with 10,000 rows at a time. Ideally, you'll design the chunks so each partition will have a couple hundred MBs each - see the best practices section on partition sizing.

This could be extended to include multiple frequencies like this:

import pandas as pd, numpy as np, dask.dataframe, dask.delayed

def load_data_subset(start_date, freq, data_freq, hf_periods):
    # here's your 1min time series *for this partition*
    high_res_ind = pd.date_range(start_date, freq=freq, periods=hf_periods)
    # here's your lower frequency (e.g. 1H, 1day) index 
    # for the same period
    data_ind = high_res_ind.floor(data_freq).drop_duplicates()

    # presumably, you'd query some API or something here. 
    # Alternatively, you could read subsets of your pre-generated 
    # frequency files. this covers the same dates as the 1 minute 
    # dataset, but only has the number of periods in the lower-res
    # time series
    dummy_response = pd.DataFrame(
        np.random.randint(0, 100, size=(len(data_ind), 234)),
        columns=list(
            f"{data_freq}_col_" + str(x) for x in range(234)
        ),
        index=data_ind
    )

    # now, reindex to the shape of the new data (this does the
    # forward fill step):
    dummy_response = (
        dummy_response
        .loc[high_res_ind.floor(data_freq)]
        .set_axis(high_res_ind)
    )

    return dummy_response

@dask.delayed
def load_all_columns_for_subset(start_date, periods):
    return pd.concat(
        [
            load_data_subset(start_date, "1min", "1min", periods),
            load_data_subset(start_date, "1min", "5min", periods),
            load_data_subset(start_date, "1min", "15min", periods),
            load_data_subset(start_date, "1min", "D", periods),
        ],
        axis=1,
    )

# generate a partitioned dataset with all columns, where lower 
# frequency columns have been ffilled, with each dataframe having
# a consistent number of rows.
ONE_MIN_NUM_OF_ROWS = 521811
full_index = pd.date_range(
    start="2019-12-09 04:00:00",
    freq="1min",
    periods=ONE_MIN_NUM_OF_ROWS,
)
df_full = dask.dataframe.from_delayed([
    load_all_columns_for_subset(full_index[i], periods=10000)
    for i in range(0, ONE_MIN_NUM_OF_ROWS, 10000)
])

This runs straight through for me. It also exports the full dataframe just fine if you call df_full.to_parquet(filepath) right after this. I ran this with a dask.distributed scheduler (running on my laptop) and kept an eye on the dashboard and total memory never exceeded 3.5GB.

Because there are so many columns the dask.dataframe preview is a bit unweildy, but here's the head and tail:

In [10]: df_full.head()
Out[10]:
                     1min_col_0  1min_col_1  1min_col_2  1min_col_3  1min_col_4  1min_col_5  1min_col_6  1min_col_7  ...  D_col_226  D_col_227  D_col_228  D_col_229  D_col_230  D_col_231  D_col_232  D_col_233
2019-12-09 04:00:00          88          36          34          57          54          98           4          92  ...         84          3         49         29         62         47         21         21
2019-12-09 04:01:00          89          61          50           2          73          44          49          33  ...         84          3         49         29         62         47         21         21
2019-12-09 04:02:00           9          18          73          76          28          17          10          49  ...         84          3         49         29         62         47         21         21
2019-12-09 04:03:00          59          73          92          28          32           8          24          85  ...         84          3         49         29         62         47         21         21
2019-12-09 04:04:00          40          54          23           5          52          63          61          64  ...         84          3         49         29         62         47         21         21

[5 rows x 936 columns]

In [11]: df_full.tail()
Out[11]:
                     1min_col_0  1min_col_1  1min_col_2  1min_col_3  1min_col_4  1min_col_5  1min_col_6  1min_col_7  ...  D_col_226  D_col_227  D_col_228  D_col_229  D_col_230  D_col_231  D_col_232  D_col_233
2020-12-11 05:15:00          81           8          51           2          77          26          66          23  ...         15         51         66         26         88         85         91         65
2020-12-11 05:16:00          67          68          34          58          43          40          76          72  ...         15         51         66         26         88         85         91         65
2020-12-11 05:17:00          93          66          21          39          12          96          53           4  ...         15         51         66         26         88         85         91         65
2020-12-11 05:18:00          69           9          69          41           5           6           6          37  ...         15         51         66         26         88         85         91         65
2020-12-11 05:19:00          18          50          25          74          78          51          10          83  ...         15         51         66         26         88         85         91         65

[5 rows x 936 columns]
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文