Concat排序的DASK数据框
我有ts
列(无索引)对N dask数据框架进行排序。我想创建一个dataframe- con缩所有它们,但仍然通过此ts
列对其进行排序。
注意:ts
可以在数据范围之间重叠。
有人可以推荐有效的实施方法吗?
更新:
dfs = []
for product in PRODUCTS:
namespace = PRODUCT_NAMESPACE[product]
message_type = PRODUCT_MESSAGE_TYPE[product]
num_expected_channels = PRODUCT_EXPECTED_CHANNELS[product]
for channel in range(num_expected_channels):
df = storage.load(
namespace,
partition_filter=(P.date == '2022-02-01') & (P.channel == str(channel)),
)
df = df.assign(
product=product,
message_type=message_type
).astype(
dict(
dtype='category',
product=pd.api.types.CategoricalDtype(PRODUCTS),
message_type=pd.api.types.CategoricalDtype(['trade', 'quote']),
)
).drop(columns=['channel', 'feed'])
df = df.set_index('ts', sorted=True, drop=False).persist()
dfs.append(df)
df = dd.concat(dfs, interleave_partitions=True)
df = df.map_partitions(lambda pdf: pdf.sort_index())
I have N Dask DataFrame sorted by the ts
column(no index). I would like to create one DataFrame - concat all of them, but still have it sorted by this ts
column.
Note: ts
can overlap between DataFrames.
Can someone recommend efficient way to implement it?
UPDATE:
dfs = []
for product in PRODUCTS:
namespace = PRODUCT_NAMESPACE[product]
message_type = PRODUCT_MESSAGE_TYPE[product]
num_expected_channels = PRODUCT_EXPECTED_CHANNELS[product]
for channel in range(num_expected_channels):
df = storage.load(
namespace,
partition_filter=(P.date == '2022-02-01') & (P.channel == str(channel)),
)
df = df.assign(
product=product,
message_type=message_type
).astype(
dict(
dtype='category',
product=pd.api.types.CategoricalDtype(PRODUCTS),
message_type=pd.api.types.CategoricalDtype(['trade', 'quote']),
)
).drop(columns=['channel', 'feed'])
df = df.set_index('ts', sorted=True, drop=False).persist()
dfs.append(df)
df = dd.concat(dfs, interleave_partitions=True)
df = df.map_partitions(lambda pdf: pdf.sort_index())
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
如果您可以附加数据范围并仍在排序,则应查看
dask.dataframe.multi.concat
。您应该查看
dask.dataframe.dataframe.merge
如果简单的串联是为了产生部分排序的数据框架。编辑:信用 @michel delgado 谁指出,在没有索引的情况下对分区进行分类的数据将是非常内存的 - 消费。您可能想仔细阅读下面的评论以查看更多详细信息。
If you can append the dataframes and still be sorted, you should look
dask.dataframe.multi.concat
.You should look into
dask.dataframe.DataFrame.merge
if the simple concatenation was to result in a partially sorted dataframe.EDIT: Credit to @Michel Delgado who pointed out that sorting data across the partitions without an index would be very memory-consuming. You might want to go through the comments below to see more details.