将 dask 数据帧存储在 parquet 中时,map_partitions 运行两次并计算记录数

发布于 2025-01-14 21:12:02 字数 572 浏览 6 评论 0原文

我有一个 dask 进程,在每个数据帧分区上运行一个函数。我让 to_parquet 来做 运行函数的compute()

但我还需要知道 parquet 表中的记录数。为此,我使用 ddf.map_partitions(len) 。问题是,当我计算记录数时,会在数据帧上再次执行compute(),这使得map_partitions函数再次运行。

运行map_partitions、将结果保存在镶木地板中并计算记录数的方法应该是什么?

def some_func(df):
    df['abc'] = df['def'] * 10
    return df

client = Client('127.0.0.1:8786')

ddf.map_partitions(some_func) # some_func executes twice for each partition

ddf.to_parquet('/some/folder/data', engine='pyarrow') 

total = ddf.map_partitions(len).compute().sum() 

I have a dask process that runs a function on each dataframe partition. I let to_parquet do the
compute() that runs the functions.

But I also need to know the number of records in the parquet table. For that, I use ddf.map_partitions(len). Problem is that when I count the number of records, a compute() is done again on the dataframe, and that makes the map_partitions functions run again.

What should be the approach to run map_partitions, save the result in parquet, and count the number of records?

def some_func(df):
    df['abc'] = df['def'] * 10
    return df

client = Client('127.0.0.1:8786')

ddf.map_partitions(some_func) # some_func executes twice for each partition

ddf.to_parquet('/some/folder/data', engine='pyarrow') 

total = ddf.map_partitions(len).compute().sum() 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

極樂鬼 2025-01-21 21:12:02

一个潜在的问题是这一行:

ddf.map_partitions(some_func)

这里,dask 的指令是映射分区,但没有指令来存储此操作的结果。因此,代码可能应该是:

# this will store the modified dataframe as ddf
ddf = ddf.map_partitions(some_func)

接下来,运行 .to_parquet 将评估(计算)数据帧 ddf 并将其从内存中丢弃,因此后续的 .compute< /code> 将重新计算数据帧。

这可能会很昂贵,因此一些可能的解决方案是:

  1. 如果 ddf 可以放入工作人员的内存中,则可以保留 ddf 以避免重新计算:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow') 
total = ddf.map_partitions(len).compute().sum()
  1. 重新定义 < code>ddf 就计算数据而言。在这里,dask 将使用存储的 parquet 文件(已计算)并从中加载信息:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')

# create a new ddf based on the computed values
ddf = dd.read_parquet('/some/folder/data')
total = ddf.map_partitions(len).compute().sum()
  1. 另一个解决方案可能是修改 some_func 以动态存储结果并返回长度。粗略的伪代码是:
path_out = 'some/template_path/for/parquet/{number}.parquet'

def some_func(df, partition_info=None):
    df['abc'] = df['def'] * 10
    # this uses partition number to create an appropriate path
    path_save_parquet = path_out.format(number=partition_info['number'])
    df.to_parquet(path_save_parquet)
    return len(df)

# this will compute total length and save the computation in the process
total = ddf.map_partitions(some_func).compute().sum()

One potential problem is line:

ddf.map_partitions(some_func)

Here, the instruction to dask is to map partitions, but there is no instruction to store the results of this operation. Hence, the code should probably be:

# this will store the modified dataframe as ddf
ddf = ddf.map_partitions(some_func)

Next, running .to_parquet will evaluate (compute) the dataframe ddf and discard it from memory, so the subsequent .compute will re-compute the dataframe.

This can be expensive, so some of the possible solutions are:

  1. if ddf can fit into memory of workers, then the ddf could be persisted to avoid recomputing:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow') 
total = ddf.map_partitions(len).compute().sum()
  1. re-defining ddf in terms of computed data. Here, dask will use the stored parquet files (that were computed) and load information from them:
ddf = ddf.map_partitions(some_func)
ddf = ddf.persist()
ddf.to_parquet('/some/folder/data', engine='pyarrow')

# create a new ddf based on the computed values
ddf = dd.read_parquet('/some/folder/data')
total = ddf.map_partitions(len).compute().sum()
  1. another solution could be to modify some_func to store the results on the fly and return len. The rough pseudocode is:
path_out = 'some/template_path/for/parquet/{number}.parquet'

def some_func(df, partition_info=None):
    df['abc'] = df['def'] * 10
    # this uses partition number to create an appropriate path
    path_save_parquet = path_out.format(number=partition_info['number'])
    df.to_parquet(path_save_parquet)
    return len(df)

# this will compute total length and save the computation in the process
total = ddf.map_partitions(some_func).compute().sum()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文