Spark与不寻常编码的CSV文件不一致

发布于 2025-02-12 11:31:52 字数 2327 浏览 1 评论 0原文

上下文:

  • 作为数据管道的一部分,我正在处理一些平面CSV文件,
  • 这些文件具有不寻常的编码和逃避规则,
  • 我的意图太预处理了,然后将其转换为parquets,以进行后续管道步骤

MCVE:

spark = SparkSession.builder.appName("...").getOrCreate()

min_schema = StructType(
    [
        StructField("dummy_col", StringType(), True),
        StructField("record_id", IntegerType(), nullable=False),
        StructField("dummy_after", StringType(), nullable=False),
    ]
)


df = (
    spark.read.option("mode", "FAILFAST")
    .option("quote", '"')
    .option("escape", '"')
    .option("inferSchema", "false")
    .option("multiline", "true")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .schema(min_schema)
    .csv(f'min_repro.csv', header=True)
)
dummy_col,record_id,dummy_after
"",1,", Unusual value with comma included"
B,2,"Unusual value with escaped quote and comma ""like, this"

CSV PARSES:CSV PARSES FING:

df.collect()

[Row(dummy_col=None, record_id=1, dummy_after=', Unusual value with comma included'),
Row(dummy_col='B', record_id=2, dummy_after='Unusual value with escaped quote and comma "like, this')]

但是,在同一同一spard spark code上DF失败而失败的错误错误:

if df.count() != df.select('record_id').distinct().count():
    pass
Py4JJavaError: An error occurred while calling o357.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 13, localhost, executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
...
Caused by: java.lang.NumberFormatException: For input string: "Unusual value with comma included""
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

我不明白.collect()在同一df上可以提供正确的行,但是在同一的任何查询DF失败了。

创建了上游错误: https://issues.apache.apache.org/jira/jira/browse/browse/spark/spark/spark/spark/spark -39842

Context:

  • As part of data pipeline, I am working on some flat CSV files
  • Those files have unusual encoding and escaping rules
  • My intention is too preprocess those and convert to parquets for subsequent pipeline steps

MCVE:

spark = SparkSession.builder.appName("...").getOrCreate()

min_schema = StructType(
    [
        StructField("dummy_col", StringType(), True),
        StructField("record_id", IntegerType(), nullable=False),
        StructField("dummy_after", StringType(), nullable=False),
    ]
)


df = (
    spark.read.option("mode", "FAILFAST")
    .option("quote", '"')
    .option("escape", '"')
    .option("inferSchema", "false")
    .option("multiline", "true")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .schema(min_schema)
    .csv(f'min_repro.csv', header=True)
)
dummy_col,record_id,dummy_after
"",1,", Unusual value with comma included"
B,2,"Unusual value with escaped quote and comma ""like, this"

CSV parses fine:

df.collect()

[Row(dummy_col=None, record_id=1, dummy_after=', Unusual value with comma included'),
Row(dummy_col='B', record_id=2, dummy_after='Unusual value with escaped quote and comma "like, this')]

Yet trivial Spark code on same DF fails with obscure error:

if df.count() != df.select('record_id').distinct().count():
    pass
Py4JJavaError: An error occurred while calling o357.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 1 times, most recent failure: Lost task 0.0 in stage 17.0 (TID 13, localhost, executor driver): org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
...
Caused by: java.lang.NumberFormatException: For input string: "Unusual value with comma included""
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

I don't understand how .collect() on same DF can provide correct rows, yet any queries on same DF are failing.

Upstream bug was created: https://issues.apache.org/jira/browse/SPARK-39842

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

夜无邪 2025-02-19 11:31:52

正确忽略的方式,数据中的数据是

  1. 双引号中的数据。
  2. 使用选项“逃生Quotes”,“ True”
df = ( spark.read.option("mode", "FAILFAST") .option("escapeQuotes", "true") .option("inferSchema", "false") .option("multiline", "true") .option("ignoreLeadingWhiteSpace", "true") .option("ignoreTrailingWhiteSpace", "true") .schema(min_schema) .csv(f'C:/Users/pc/Desktop/sample2.csv', header=True) )
------------------------------------------------------------------------

    >>> df.select('dummy_after').show(truncate=False)
    +-----------------------------------+
    |dummy_after                        |
    +-----------------------------------+
    |, Unusual value with comma included|
    +-----------------------------------+

    >>> if df.count() != df.select('record_id').distinct().count():
    ...    pass

Correct way of ignoring , within Data is

  1. enclose data within Double quotes.
  2. Use option "escapeQuotes", "true"
df = ( spark.read.option("mode", "FAILFAST") .option("escapeQuotes", "true") .option("inferSchema", "false") .option("multiline", "true") .option("ignoreLeadingWhiteSpace", "true") .option("ignoreTrailingWhiteSpace", "true") .schema(min_schema) .csv(f'C:/Users/pc/Desktop/sample2.csv', header=True) )
------------------------------------------------------------------------

    >>> df.select('dummy_after').show(truncate=False)
    +-----------------------------------+
    |dummy_after                        |
    +-----------------------------------+
    |, Unusual value with comma included|
    +-----------------------------------+

    >>> if df.count() != df.select('record_id').distinct().count():
    ...    pass
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文