Scala Spark UDF函数获取输入并将其放在数组中

发布于 2025-01-27 02:22:59 字数 2745 浏览 2 评论 0原文

我正在尝试为Spark创建一个可以在Spark SQL中使用的Scala UDF。该函数的目的是接受任何列类型作为输入,并将其放入阵列类型,除非输入已经是arraytype。

这是我到目前为止的代码:

package com.my_namespace.spark.udf

import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types._

import org.apache.spark.sql.SparkSession


class GetDatatype extends UDF1[Object, scala.collection.Seq[_]] {
    override def call(inputObject: Object): scala.collection.Seq[_] = {
        if (inputObject.isInstanceOf[scala.collection.Seq[_]]) {
            return inputObject.asInstanceOf[scala.collection.Seq[_]]
        } else {
            return Array(inputObject)
        }
    }
}

val myFunc = new GetDatatype().call _
val myFuncUDF = udf(myFunc)
spark.udf.register("myFuncUDF", myFuncUDF)

数据可能看起来像这样:

+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|create_date|item       |datatype_of_item                                              |item2   |datatype_of_item2              |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|2021-06-01 |[item 3, 3]|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|string 3|java.lang.String               |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+

或以下:

+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|create_date|item                      |datatype_of_item                           |item2               |datatype_of_item_2                         |
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|2021-05-01 |[[item 1, 1], [item 2, 2]]|scala.collection.mutable.WrappedArray$ofRef|[string 1, string 2]|scala.collection.mutable.WrappedArray$ofRef|
|2021-06-01 |[[item 3, 3]]             |scala.collection.mutable.WrappedArray$ofRef|[string 3]          |scala.collection.mutable.WrappedArray$ofRef|
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+

udf函数可以从item2列或item2列传递。

但是,在执行此行时:

val myFuncUDF = udf(myFunc)

我会收到以下错误:

scala> val myFuncUDF = udf(myFunc)
java.lang.UnsupportedOperationException: Schema for type Any is not supported
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743)

I am trying to create a Scala UDF for Spark, that can be used in Spark SQL. The objective of the function is to accept any column type as input, and put it in an ArrayType, unless the input is already an ArrayType.

Here's the code I have so far:

package com.my_namespace.spark.udf

import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types._

import org.apache.spark.sql.SparkSession


class GetDatatype extends UDF1[Object, scala.collection.Seq[_]] {
    override def call(inputObject: Object): scala.collection.Seq[_] = {
        if (inputObject.isInstanceOf[scala.collection.Seq[_]]) {
            return inputObject.asInstanceOf[scala.collection.Seq[_]]
        } else {
            return Array(inputObject)
        }
    }
}

val myFunc = new GetDatatype().call _
val myFuncUDF = udf(myFunc)
spark.udf.register("myFuncUDF", myFuncUDF)

The data may look like this:

+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|create_date|item       |datatype_of_item                                              |item2   |datatype_of_item2              |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+
|2021-06-01 |[item 3, 3]|org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema|string 3|java.lang.String               |
+-----------+-----------+--------------------------------------------------------------+--------+-------------------------------+

or this:

+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|create_date|item                      |datatype_of_item                           |item2               |datatype_of_item_2                         |
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+
|2021-05-01 |[[item 1, 1], [item 2, 2]]|scala.collection.mutable.WrappedArray$ofRef|[string 1, string 2]|scala.collection.mutable.WrappedArray$ofRef|
|2021-06-01 |[[item 3, 3]]             |scala.collection.mutable.WrappedArray$ofRef|[string 3]          |scala.collection.mutable.WrappedArray$ofRef|
+-----------+--------------------------+-------------------------------------------+--------------------+-------------------------------------------+

The UDF function may be passed contents from item or item2 columns.

However when executing this line:

val myFuncUDF = udf(myFunc)

I get the following error:

scala> val myFuncUDF = udf(myFunc)
java.lang.UnsupportedOperationException: Schema for type Any is not supported
  at org.apache.spark.sql.catalyst.ScalaReflection$.$anonfun$schemaFor$1(ScalaReflection.scala:743)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

落墨 2025-02-03 02:22:59

Spark无法将UDF与此返回类型(任何ODER对象)一起使用。我认为您可以做到这一点:

val df = Seq(
 (Seq((1,"a"),(2,"b")),(1,"a"))
).toDF("item","item 2")


def wrapInArray(df:DataFrame,c:String) = if(df.schema(c).dataType.isInstanceOf[ArrayType]) col(c) else array(col(c))

df
 .withColumn("test",wrapInArray(df,"item"))
 .withColumn("test 2",wrapInArray(df,"item 2"))

给予模式

root
 |-- item: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)
 |-- item 2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)
 |-- test: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)
 |-- test 2: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)

Spark cannot use UDFs with this return type (Any, oder Object). You could do it without UDF I think:

val df = Seq(
 (Seq((1,"a"),(2,"b")),(1,"a"))
).toDF("item","item 2")


def wrapInArray(df:DataFrame,c:String) = if(df.schema(c).dataType.isInstanceOf[ArrayType]) col(c) else array(col(c))

df
 .withColumn("test",wrapInArray(df,"item"))
 .withColumn("test 2",wrapInArray(df,"item 2"))

gives the schema

root
 |-- item: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)
 |-- item 2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: string (nullable = true)
 |-- test: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)
 |-- test 2: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文