在检查字符串列并将错误值保存到数据abrick时,请在内存中。

发布于 2025-02-02 18:47:10 字数 825 浏览 1 评论 0原文

我需要在数据框架中进行双引号检查。因此,我在所有列中迭代此检查,但需要大量时间。我正在使用Azure Databricks。

for column in columns_list:
      column_name = "`" + column + "`"
      df_reject = source_data.withColumn("flag_quotes",when(source_data[column_name].rlike("[\"\"]"),lit("Yes")).otherwise(lit("No")))     
      df_quo_rejected_df = df_reject.filter(col("flag_quotes") == "Yes") 
      
     
      df_quo_rejected_df = df_quo_rejected_df.withColumn('Error', lit(err))
      df_quo_rejected_df.coalesce(1).write.mode("append").option("header","true")\
                  .option("delimiter",delimiter)\
                  .format("com.databricks.spark.csv")\
                  .save(filelocwrite)

我有大约500列,有4000万张记录。我尝试了Union数据范围的每次迭代,但是某个时候操作确实可以使用。因此,我保存数据框并将其附加到每次迭代。请帮助我以一种优化运行时间的方法。

I need to do a double quotes check in a dataframe. So I am iterating through all the columns for this check but takes lot of time. I am using Azure Databricks for this.

for column in columns_list:
      column_name = "`" + column + "`"
      df_reject = source_data.withColumn("flag_quotes",when(source_data[column_name].rlike("[\"\"]"),lit("Yes")).otherwise(lit("No")))     
      df_quo_rejected_df = df_reject.filter(col("flag_quotes") == "Yes") 
      
     
      df_quo_rejected_df = df_quo_rejected_df.withColumn('Error', lit(err))
      df_quo_rejected_df.coalesce(1).write.mode("append").option("header","true")\
                  .option("delimiter",delimiter)\
                  .format("com.databricks.spark.csv")\
                  .save(filelocwrite)

I have got around 500 columns with 40 million records. I tried union the dataframes every iteration but the operation does OOM after sometime. So I save the dataframe and append it every iteration. Please help me with a way to optimize the running time.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

困倦 2025-02-09 18:47:10

您可以尝试使用

from pyspark.sql import functions as F

columns_list = [f"`{c}`" for c in columns_list]
df_reject = source_data.filter(F.exists(F.array(*columns_list), lambda x: x.rlike("[\"\"]")))
df_cols_add = df_reject.select('*', F.lit('Yes').alias('flag_quotes'), F.lit(err).alias('Error'))

Instead of looping through columns you can try checking their values using exists.

from pyspark.sql import functions as F

columns_list = [f"`{c}`" for c in columns_list]
df_reject = source_data.filter(F.exists(F.array(*columns_list), lambda x: x.rlike("[\"\"]")))
df_cols_add = df_reject.select('*', F.lit('Yes').alias('flag_quotes'), F.lit(err).alias('Error'))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文