Concat排序的DASK数据框

发布于 2025-01-23 20:02:45 字数 1124 浏览 0 评论 0原文

我有ts列(无索引)对N dask数据框架进行排序。我想创建一个dataframe- con缩所有它们,但仍然通过此ts列对其进行排序。

注意:ts可以在数据范围之间重叠。

有人可以推荐有效的实施方法吗?

更新:

dfs = []
for product in PRODUCTS:
   namespace = PRODUCT_NAMESPACE[product]
   message_type = PRODUCT_MESSAGE_TYPE[product]

   num_expected_channels = PRODUCT_EXPECTED_CHANNELS[product]
   for channel in range(num_expected_channels):
       df = storage.load(
           namespace,
           partition_filter=(P.date == '2022-02-01') & (P.channel == str(channel)),
       )

       df = df.assign(
          product=product,
          message_type=message_type
       ).astype(
          dict(
              dtype='category',
              product=pd.api.types.CategoricalDtype(PRODUCTS),
              message_type=pd.api.types.CategoricalDtype(['trade', 'quote']),
          )
       ).drop(columns=['channel', 'feed'])

       df = df.set_index('ts', sorted=True, drop=False).persist()

       dfs.append(df)

df = dd.concat(dfs, interleave_partitions=True)
df = df.map_partitions(lambda pdf: pdf.sort_index())

I have N Dask DataFrame sorted by the ts column(no index). I would like to create one DataFrame - concat all of them, but still have it sorted by this ts column.

Note: ts can overlap between DataFrames.

Can someone recommend efficient way to implement it?

UPDATE:

dfs = []
for product in PRODUCTS:
   namespace = PRODUCT_NAMESPACE[product]
   message_type = PRODUCT_MESSAGE_TYPE[product]

   num_expected_channels = PRODUCT_EXPECTED_CHANNELS[product]
   for channel in range(num_expected_channels):
       df = storage.load(
           namespace,
           partition_filter=(P.date == '2022-02-01') & (P.channel == str(channel)),
       )

       df = df.assign(
          product=product,
          message_type=message_type
       ).astype(
          dict(
              dtype='category',
              product=pd.api.types.CategoricalDtype(PRODUCTS),
              message_type=pd.api.types.CategoricalDtype(['trade', 'quote']),
          )
       ).drop(columns=['channel', 'feed'])

       df = df.set_index('ts', sorted=True, drop=False).persist()

       dfs.append(df)

df = dd.concat(dfs, interleave_partitions=True)
df = df.map_partitions(lambda pdf: pdf.sort_index())

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

爱殇璃 2025-01-30 20:02:45

如果您可以附加数据范围并仍在排序,则应查看dask.dataframe.multi.concat

您应该查看dask.dataframe.dataframe.merge如果简单的串联是为了产生部分排序的数据框架。

编辑:信用 @michel delgado 谁指出,在没有索引的情况下对分区进行分类的数据将是非常内存的 - 消费。您可能想仔细阅读下面的评论以查看更多详细信息。

If you can append the dataframes and still be sorted, you should look dask.dataframe.multi.concat.

You should look into dask.dataframe.DataFrame.merge if the simple concatenation was to result in a partially sorted dataframe.

EDIT: Credit to @Michel Delgado who pointed out that sorting data across the partitions without an index would be very memory-consuming. You might want to go through the comments below to see more details.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文