将 dask 数据帧存储在 parquet 中时,map_partitions 运行两次并计算记录数
我有一个 dask 进程,在每个数据帧分区上运行一个函数。我让 to_parquet
来做 运行函数的compute()
。
但我还需要知道 parquet 表中的记录数。为此,我使用 ddf.map_partitions(len) 。问题是,当我计算记录数时,会在数据帧上再次执行compute(),这使得map_partitions函数再次运行。
运行map_partitions、将结果保存在镶木地板中并计算记录数的方法应该是什么?
def some_func(df):
df['abc'] = df['def'] * 10
return df
client = Client('127.0.0.1:8786')
ddf.map_partitions(some_func) # some_func executes twice for each partition
ddf.to_parquet('/some/folder/data', engine='pyarrow')
total = ddf.map_partitions(len).compute().sum()
I have a dask process that runs a function on each dataframe partition. I let to_parquet
do thecompute()
that runs the functions.
But I also need to know the number of records in the parquet
table. For that, I use ddf.map_partitions(len)
. Problem is that when I count the number of records, a compute()
is done again on the dataframe, and that makes the map_partitions functions run again.
What should be the approach to run map_partitions, save the result in parquet, and count the number of records?
def some_func(df):
df['abc'] = df['def'] * 10
return df
client = Client('127.0.0.1:8786')
ddf.map_partitions(some_func) # some_func executes twice for each partition
ddf.to_parquet('/some/folder/data', engine='pyarrow')
total = ddf.map_partitions(len).compute().sum()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
一个潜在的问题是这一行:
这里,dask 的指令是映射分区,但没有指令来存储此操作的结果。因此,代码可能应该是:
接下来,运行
.to_parquet
将评估(计算)数据帧ddf
并将其从内存中丢弃,因此后续的.compute< /code> 将重新计算数据帧。
这可能会很昂贵,因此一些可能的解决方案是:
One potential problem is line:
Here, the instruction to dask is to map partitions, but there is no instruction to store the results of this operation. Hence, the code should probably be:
Next, running
.to_parquet
will evaluate (compute) the dataframeddf
and discard it from memory, so the subsequent.compute
will re-compute the dataframe.This can be expensive, so some of the possible solutions are:
ddf
can fit into memory of workers, then theddf
could be persisted to avoid recomputing:ddf
in terms of computed data. Here,dask
will use the stored parquet files (that were computed) and load information from them:some_func
to store the results on the fly and returnlen
. The rough pseudocode is: