Scala Spark UDF函数获取输入并将其放在数组中
我正在尝试为Spark创建一个可以在Spark SQL中使用的Scala UDF。该函数的目的是接受任何列类型作为输入,并将其放入阵列类型,除非输入已经是arraytype。
这是我到目前为止的代码:
package com.my_namespace.spark.udf
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
class GetDatatype extends UDF1[Object, scala.collection.Seq[_]] {
override def call(inputObject: Object): scala.collection.Seq[_] = {
if (inputObject.isInstanceOf[scala.collection.Seq[_]]) {
return inputObject.asInstanceOf[scala.collection.Seq[_]]
} else {
return Array(inputObject)
}
}
}
val myFunc = new GetDatatype().call _
val myFuncUDF = udf(myFunc)
spark.udf.register("myFuncUDF", myFuncUDF)
数据可能看起来像这样:
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|create_date|item |datatype_of_item |item2 |datatype_of_item2 |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|2021-06-01 |[item 3, 3]|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|string 3|java.lang.String |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
或以下:
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|create_date|item |datatype_of_item |item2 |datatype_of_item_2 |
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|2021-05-01 |[[item 1, 1], [item 2, 2]]|scala.collection.mutable.WrappedArray$ofRef|[string 1, string 2]|scala.collection.mutable.WrappedArray$ofRef|
|2021-06-01 |[[item 3, 3]] |scala.collection.mutable.WrappedArray$ofRef|[string 3] |scala.collection.mutable.WrappedArray$ofRef|
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
udf函数可以从item2列或item2列传递。
但是,在执行此行时:
val myFuncUDF = udf(myFunc)
我会收到以下错误:
scala> val myFuncUDF = udf(myFunc)
java.lang.UnsupportedOperationException: Schema for type Any is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743)
I am trying to create a Scala UDF for Spark, that can be used in Spark SQL. The objective of the function is to accept any column type as input, and put it in an ArrayType, unless the input is already an ArrayType.
Here's the code I have so far:
package com.my_namespace.spark.udf
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
class GetDatatype extends UDF1[Object, scala.collection.Seq[_]] {
override def call(inputObject: Object): scala.collection.Seq[_] = {
if (inputObject.isInstanceOf[scala.collection.Seq[_]]) {
return inputObject.asInstanceOf[scala.collection.Seq[_]]
} else {
return Array(inputObject)
}
}
}
val myFunc = new GetDatatype().call _
val myFuncUDF = udf(myFunc)
spark.udf.register("myFuncUDF", myFuncUDF)
The data may look like this:
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|create_date|item |datatype_of_item |item2 |datatype_of_item2 |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|2021-06-01 |[item 3, 3]|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|string 3|java.lang.String |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
or this:
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|create_date|item |datatype_of_item |item2 |datatype_of_item_2 |
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|2021-05-01 |[[item 1, 1], [item 2, 2]]|scala.collection.mutable.WrappedArray$ofRef|[string 1, string 2]|scala.collection.mutable.WrappedArray$ofRef|
|2021-06-01 |[[item 3, 3]] |scala.collection.mutable.WrappedArray$ofRef|[string 3] |scala.collection.mutable.WrappedArray$ofRef|
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
The UDF function may be passed contents from item or item2 columns.
However when executing this line:
val myFuncUDF = udf(myFunc)
I get the following error:
scala> val myFuncUDF = udf(myFunc)
java.lang.UnsupportedOperationException: Schema for type Any is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
Spark无法将UDF与此返回类型(任何ODER对象)一起使用。我认为您可以做到这一点:
给予模式
Spark cannot use UDFs with this return type (Any, oder Object). You could do it without UDF I think:
gives the schema