如何使用UDF处理大的增量表?
我有一个包含大约 3000 亿行的增量表。现在我正在使用 UDF 对一列执行一些操作并创建另一列
我的代码是这样的
def my_udf(data):
return pass
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))
现在的问题是这需要很长时间,因为 Spark 将处理所有 3000 亿行,然后写入输出。有没有一种方法可以让我们进行一些 Mirco 批处理并将其定期写入输出增量表
I have a delta table with about 300 billion rows. Now I am performing some operations on a column using UDF and creating another column
My code is something like this
def my_udf(data):
return pass
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))
The issue now is this take a long amount of time as Spark will process all 300 billion rows and then write the output. Is there a way where we can do some Mirco batching and write output of those regularly to the output delta table
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
第一条规则通常是尽可能避免 UDF - 您需要执行哪种 Spark 本身不可用的转换?
第二条规则 - 如果无法避免使用 UDF,至少使用 Pandas UDF 批量处理数据,并且没有如此大的序列化/反序列化开销 - 通常的 UDF 逐行处理数据,编码和处理数据。为每个人解码数据。
如果您的表是随着时间的推移而构建的,并且包含许多文件,您可以尝试将 Spark 结构化流与 Trigger.AvailableNow 结合使用(需要 DBR 10.3 或 10.4),如下所示:
这将读取源表逐块应用转换,并将数据写入目标表。
The first rule usually is to avoid UDFs as much of possible - what kind of transformation do you need to perform that isn't available in the Spark itself?
Second rule - if you can't avoid using UDF, at least use Pandas UDFs that process data in batches, and don't have so big serialization/deserialization overhead - usual UDFs are handling data row by row, encoding & decoding data for each of them.
If your table was built over the time, and consists of many files, you can try to use Spark Structured Streaming with
Trigger.AvailableNow
(requires DBR 10.3 or 10.4), something like this:this will read the source table chunk by chunk, apply your transformation, and write data into a destination table.