转换后 PySpark parquet 文件覆盖

发布于 2025-01-20 04:50:06 字数 1136 浏览 2 评论 0原文

我试图在镶木地板文件中进行转换(合并两个表,或添加一列),然后保存它,同时遇到 FileNotFound 错误。

重现我的错误的代码片段如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.appName('test').getOrCreate()

# create some data
a = spark.createDataFrame(pd.DataFrame({
    'x': [1, 2, 3], 'y':[1, 2, 3]
}))
a.write.mode('overwrite').parquet('./a')

# make some transformation onto the data
a = spark.read.parquet('./a')
b = spark.read.parquet('./a')
# such as Union, and the same error replacing the following by spark.sql()
c = a.union(b).withColumn('id', monotonically_increasing_id())
c.show()

# overwrite (WHERE I got the FileNotFound error)
# one can see the parquet-xxxxxx.snappy.parquet got suppressed when overwrite
c.write.mode('overwrite').parquet('./a')

错误:

原因:org.apache.spark.SparkException:作业由于阶段而中止 失败:阶段 105.0 中的任务 2 失败 1 次,最近一次失败: 在阶段 105.0 (TID 265) 中丢失任务 2.0 (suweiguodembp.home 执行器 驱动程序):java.io.FileNotFoundException:文件文件: /data-xxx-xxxxxxx.snappy.parquet

我的目标是修改新应用程序的数据中的某些行。但为什么它不能合并或插入列呢?附加镶木地板文件记录的正确方法是什么

I'm trying to make transformations (union two tables, or add a column) in parquet file then save it, while running into FileNotFound Error.

The snippet to reproduce my error is the following:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.appName('test').getOrCreate()

# create some data
a = spark.createDataFrame(pd.DataFrame({
    'x': [1, 2, 3], 'y':[1, 2, 3]
}))
a.write.mode('overwrite').parquet('./a')

# make some transformation onto the data
a = spark.read.parquet('./a')
b = spark.read.parquet('./a')
# such as Union, and the same error replacing the following by spark.sql()
c = a.union(b).withColumn('id', monotonically_increasing_id())
c.show()

# overwrite (WHERE I got the FileNotFound error)
# one can see the parquet-xxxxxx.snappy.parquet got suppressed when overwrite
c.write.mode('overwrite').parquet('./a')

The error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 2 in stage 105.0 failed 1 times, most recent failure:
Lost task 2.0 in stage 105.0 (TID 265) (suweiguodembp.home executor
driver): java.io.FileNotFoundException: File file:
/data-xxx-xxxxxxx.snappy.parquet

My goal is to modify some lines in data for a new application. But why it can't do union or insert columns? What is the correct way to append records for parquet files

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

三生一梦 2025-01-27 04:50:06

火花的理念是“不覆盖数据”。第一件事模式('oferwrite')要做的就是删除路径。而且,由于您使用的是相同的路径,因此在重写数据之前,您会丢失数据。您需要进行转换并写入另一个位置(例如./ temp),然后进一步在新地方使用“清洁”数据。 (即使您需要)您将其移回原始位置(./ a)。

The philosophy of spark is 'not to overwrite the data'. The very first thing mode('overwrite') would do is delete the path. And since you're using the same path, you'd lost your data before rewriting it. You'll want to make transformation and write to another place (./temp for example) and further use the 'cleaned' data at the new place. (Even though you need) you move it back to the original place (./a) from another place.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文