Scala爆炸后,然后在数据框架上进行UDF失败
我有一个带有以下模式的Scala数据框:
root
|-- time: string (nullable = true)
|-- itemId: string (nullable = true)
|-- itemFeatures: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
我想爆炸itemfeatures
列,然后将我的数据框发送到UDF。但是,一旦我包括爆炸
,调用UDF会导致此错误: org.apache.spark.sparkexception:任务不是序列化
我不知道为什么???
环境:Scala 2.11.12,Spark 2.4.4
完整示例:
val dataList = List(
("time1", "id1", "map1"),
("time2", "id2", "map2"))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode("itemFeatures"))
val doNextThingUDF: UserDefinedFunction = udf(doNextThing _)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time"))
我的UDF看起来像这样:
val doNextThing(time: String): String = {
time+"blah"
}
如果我删除爆炸
,一切正常,或者如果我在爆炸后不调用UDF ,一切都很好。我可以想象,如果Spark动态执行爆炸件,并且不知道将存在多少行,但是即使我添加ex dfexploded.cache()<,但即使我不知道要存在多少行,也无法将每行发送到UDF。 /code>和
dfexploded.count()
我仍然会出现错误。这是一个已知问题吗?我想念什么?
I have a scala dataframe with the following schema:
root
|-- time: string (nullable = true)
|-- itemId: string (nullable = true)
|-- itemFeatures: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
I want to explode the itemFeatures
column and then send my dataframe to a UDF. But as soon as I include the explode
, calling the UDF results in this error:org.apache.spark.SparkException: Task not serializable
I can't figure out why???
Environment: Scala 2.11.12, Spark 2.4.4
Full example:
val dataList = List(
("time1", "id1", "map1"),
("time2", "id2", "map2"))
val df = dataList.toDF("time", "itemId", "itemFeatures")
val dfExploded = df.select(col("time"), col("itemId"), explode("itemFeatures"))
val doNextThingUDF: UserDefinedFunction = udf(doNextThing _)
val dfNextThing = dfExploded.withColumn("nextThing", doNextThingUDF(col("time"))
where my UDF looks like this:
val doNextThing(time: String): String = {
time+"blah"
}
If I remove the explode
, everything works fine, or if I don't call the UDF after the explode, everything works fine. I could imagine Spark is somehow unable to send each row to a UDF if it is dynamically executing the explode and doesn't know how many rows that are going to exist, but even when I add ex dfExploded.cache()
and dfExploded.count()
I still get the error. Is this a known issue? What am I missing?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我认为问题来自您如何定义
Donextthing
函数。还您的“完整示例”中有几个错别字。
尤其是ItemFeatures列是您示例的字符串,我知道它应该是一张地图。
但这是一个有效的例子:
I think the issue come from how you define your
donextThing
function. Alsothere is couple of typos in your "full example".
Especially the itemFeatures column is a string in your example, I understand it should be a Map.
But here is a working example: