Scala爆炸后,然后在数据框架上进行UDF失败

发布于 2025-01-29 00:27:34 字数 1179 浏览 2 评论 0原文

我有一个带有以下模式的Scala数据框:

root
 |-- time: string (nullable = true)
 |-- itemId: string (nullable = true)
 |-- itemFeatures: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

我想爆炸itemfeatures列,然后将我的数据框发送到UDF。但是,一旦我包括爆炸,调用UDF会导致此错误: org.apache.spark.sparkexception:任务不是序列化

我不知道为什么???

环境:Scala 2.11.12,Spark 2.4.4

完整示例:

val dataList = List(
    ("time1", "id1", "map1"),
    ("time2", "id2", "map2"))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode("itemFeatures"))

val doNextThingUDF: UserDefinedFunction = udf(doNextThing _)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time"))

我的UDF看起来像这样:

val doNextThing(time: String): String = {
  time+"blah"
}

如果我删除爆炸,一切正常,或者如果我在爆炸后不调用UDF ,一切都很好。我可以想象,如果Spark动态执行爆炸件,并且不知道将存在多少行,但是即使我添加ex dfexploded.cache()<,但即使我不知道要存在多少行,也无法将每行发送到UDF。 /code>和dfexploded.count()我仍然会出现错误。这是一个已知问题吗?我想念什么?

I have a scala dataframe with the following schema:

root
 |-- time: string (nullable = true)
 |-- itemId: string (nullable = true)
 |-- itemFeatures: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

I want to explode the itemFeatures column and then send my dataframe to a UDF. But as soon as I include the explode, calling the UDF results in this error:
org.apache.spark.SparkException: Task not serializable

I can't figure out why???

Environment: Scala 2.11.12, Spark 2.4.4

Full example:

val dataList = List(
    ("time1", "id1", "map1"),
    ("time2", "id2", "map2"))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode("itemFeatures"))

val doNextThingUDF: UserDefinedFunction = udf(doNextThing _)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time"))

where my UDF looks like this:

val doNextThing(time: String): String = {
  time+"blah"
}

If I remove the explode, everything works fine, or if I don't call the UDF after the explode, everything works fine. I could imagine Spark is somehow unable to send each row to a UDF if it is dynamically executing the explode and doesn't know how many rows that are going to exist, but even when I add ex dfExploded.cache() and dfExploded.count() I still get the error. Is this a known issue? What am I missing?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

┈┾☆殇 2025-02-05 00:27:34

我认为问题来自您如何定义Donextthing函数。还
您的“完整示例”中有几个错别字。

尤其是ItemFeatures列是您示例的字符串,我知道它应该是一张地图。
但这是一个有效的例子:

val dataList = List(
    ("time1", "id1", Map("map1" -> 1)),
    ("time2", "id2", Map("map2" -> 2)))

val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode($"itemFeatures"))

val doNextThing = (time: String) => {time+"blah"}
val doNextThingUDF = udf(doNextThing)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time")))

I think the issue come from how you define your donextThing function. Also
there is couple of typos in your "full example".

Especially the itemFeatures column is a string in your example, I understand it should be a Map.
But here is a working example:

val dataList = List(
    ("time1", "id1", Map("map1" -> 1)),
    ("time2", "id2", Map("map2" -> 2)))

val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode(
quot;itemFeatures"))

val doNextThing = (time: String) => {time+"blah"}
val doNextThingUDF = udf(doNextThing)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time")))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文