Pyspark DataFrame将错误保存到Parquet?

发布于 2025-02-12 16:23:07 字数 1236 浏览 2 评论 0原文

在执行一系列转换后,当将数据框保存到镶木quet文件时,我会遇到一个令人困惑的问题。

运行一系列转换步骤后,我通过检查大小和重复项来检查最终数据框架是否正确。

result_df.count()

213051

result_df.select(count_distinct("RECORD_ID")).show()
+-------------------------+
|count(DISTINCT RECORD_ID)|
+-------------------------+
|                   213051|
+-------------------------+

我会使用API​​编写数据框为parquet,

result_df.write.parquet('path')

然后,当我尝试阅读处理后的数据时,

processed_df = spark.read.parquet('path') 
processed_df.count()

213051

processed_df.select(count_distinct("RECORD_ID")).show()
+-------------------------+
|count(DISTINCT RECORD_ID)|
+-------------------------+
|                     6659|
+-------------------------+

以后有人对为什么会发生这种情况以及为什么保存时会引入数据帧中的重复项?我不确定在保存我缺少的数据框架时是否有选项可以纠正此行为。我已经多次检查了最终DF的重复项和不正确的数据,但是直到保存并重新加载它,它总是看起来不错。

群集是Azure Databricks上的驱动程序 + 3名工人(Spark 3.2.1),如果有帮助。

谢谢!

编辑:

我能够将问题缩小到转换步骤中的主要数据帧的数据框架。有问题的DF由File_name列,然后由RAW_TEXT列组成,该列由每个行的大字符串组成。

作为参考,我已经运行以下UDF来帮助非ASCII字符:

def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')

ascii_udf = udf(ascii_ignore)

这是Parquet文件无法解释特殊字符的问题,从而用以前的好行覆盖行?

I'm running into a perplexing problem when saving a dataframe to a parquet file after performing a series of transformations.

After I run a series of transformation steps, I check that the final dataframe is correct by checking the size and for duplicates.

result_df.count()

213051

result_df.select(count_distinct("RECORD_ID")).show()
+-------------------------+
|count(DISTINCT RECORD_ID)|
+-------------------------+
|                   213051|
+-------------------------+

I then write the dataframe to parquet using the API

result_df.write.parquet('path')

The issue arrises when I try and read the processed data later

processed_df = spark.read.parquet('path') 
processed_df.count()

213051

processed_df.select(count_distinct("RECORD_ID")).show()
+-------------------------+
|count(DISTINCT RECORD_ID)|
+-------------------------+
|                     6659|
+-------------------------+

Anyone have an idea about why this may be occuring and why duplicates are being introduced into the dataframe when saving? I'm not sure if there is an option when saving the dataframe that I am missing that would correct this behavior. I've checked the final df multiple times for duplicates and incorrect data but it alway looks good until I save it and reload it.

The cluster is a driver + 3 workers on Azure Databricks(Spark 3.2.1) if that is helpful.

Thanks!

EDIT:

I've been able to narrow down the issue to a dataframe I join on to the main dataframe in the transformation steps. The df in question is comprised of a file_name column and then a raw_text column which comprises of a large string for each row.

For reference, I'm already running the following UDF to help with non ascii characters:

def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')

ascii_udf = udf(ascii_ignore)

Is this an issue with the parquet file not being able to interpret special characters and thus overwriting rows with the previously good rows?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文