如何使用UDF处理大的增量表?

发布于 2025-01-16 20:47:54 字数 377 浏览 2 评论 0原文

我有一个包含大约 3000 亿行的增量表。现在我正在使用 UDF 对一列执行一些操作并创建另一列

我的代码是这样的

def my_udf(data):
    return pass
       
 
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))

现在的问题是这需要很长时间,因为 Spark 将处理所有 3000 亿行,然后写入输出。有没有一种方法可以让我们进行一些 Mirco 批处理并将其定期写入输出增量表

I have a delta table with about 300 billion rows. Now I am performing some operations on a column using UDF and creating another column

My code is something like this

def my_udf(data):
    return pass
       
 
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))

The issue now is this take a long amount of time as Spark will process all 300 billion rows and then write the output. Is there a way where we can do some Mirco batching and write output of those regularly to the output delta table

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦里寻她 2025-01-23 20:47:54

第一条规则通常是尽可能避免 UDF - 您需要执行哪种 Spark 本身不可用的转换?

第二条规则 - 如果无法避免使用 UDF,至少使用 Pandas UDF 批量处理数据,并且没有如此大的序列化/反序列化开销 - 通常的 UDF 逐行处理数据,编码和处理数据。为每个人解码数据。

如果您的表是随着时间的推移而构建的,并且包含许多文件,您可以尝试将 Spark 结构化流与 Trigger.AvailableNow 结合使用(需要 DBR 10.3 或 10.4),如下所示:

maxNumFiles = 10 # max number of parquet files processed at once
df = spark.readStream \
  .option("maxFilesPerTrigger", maxNumFiles) \ 
  .table("large_table")
df = df.withColumn('new_column', udf_func(data.value))
df.writeStream \
  .option("checkpointLocation", "/some/path") \
  .trigger(availableNow=True) \
  .toTable("my_destination_table")

这将读取源表逐块应用转换,并将数据写入目标表。

The first rule usually is to avoid UDFs as much of possible - what kind of transformation do you need to perform that isn't available in the Spark itself?

Second rule - if you can't avoid using UDF, at least use Pandas UDFs that process data in batches, and don't have so big serialization/deserialization overhead - usual UDFs are handling data row by row, encoding & decoding data for each of them.

If your table was built over the time, and consists of many files, you can try to use Spark Structured Streaming with Trigger.AvailableNow (requires DBR 10.3 or 10.4), something like this:

maxNumFiles = 10 # max number of parquet files processed at once
df = spark.readStream \
  .option("maxFilesPerTrigger", maxNumFiles) \ 
  .table("large_table")
df = df.withColumn('new_column', udf_func(data.value))
df.writeStream \
  .option("checkpointLocation", "/some/path") \
  .trigger(availableNow=True) \
  .toTable("my_destination_table")

this will read the source table chunk by chunk, apply your transformation, and write data into a destination table.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文