在不使用UDF的情况下添加多列中的多列

发布于 2025-02-03 02:15:03 字数 1941 浏览 3 评论 0原文

我想使用addressParser功能从给定的表结构中解析地址列,以获取数字,街道,城市和国家。

样本输入:

地址地址
add001“ 384,East Avenue Street,纽约,美国
add002” 123,Maccolm Street,MacColm Street,Copenhagen,Denmark,Denmark,“

示例代码供参考:


object ParseAddress extends App {

  val spark = SparkSession.builder().master("local[*]").appName("ParseAddress ").getOrCreate()

  import spark.implicits._

  case class AddressRawData(addressId: String, address: String)

  case class AddressData(
                          addressId: String,
                          address: String,
                          number: Option[Int],
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )

 def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")

      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }

  val addressDS: Dataset[AddressRawData] = addressDF.as[AddressRawData]

}

预期输出:

地址ID地址公路城市
ADD001“ 384,East Avenue Street,纽约,美国384East Avenue StreetNew York New YorkUSA
add002” 123“ 123”,Maccolm Street,Copenhagen,Denmark,DenmarkStreetCopenhagenDenmark

Maccolm 123我将地址转换为输入以解析列数据。非常感谢解决此问题的某种形式的帮助。

I want to parse the address column from the given table structure using addressParser function to get number, street, city and country.

Sample Input:

addressIdaddress
ADD001"384, East Avenue Street, New York, USA
ADD002"123, Maccolm Street, Copenhagen, Denmark"

The sample code is attached for reference:


object ParseAddress extends App {

  val spark = SparkSession.builder().master("local[*]").appName("ParseAddress ").getOrCreate()

  import spark.implicits._

  case class AddressRawData(addressId: String, address: String)

  case class AddressData(
                          addressId: String,
                          address: String,
                          number: Option[Int],
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )

 def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")

      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }

  val addressDS: Dataset[AddressRawData] = addressDF.as[AddressRawData]

}

Expected output:

addressIdaddressnumberroadcitycountry
ADD001"384, East Avenue Street, New York, USA384East Avenue StreetNew YorkUSA
ADD002"123, Maccolm Street, Copenhagen, Denmark"123Maccolm StreetCopenhagenDenmark

I am not sure how should I convert addressDS as an input to function to parse the column data. Some form of help to solve this problem is very much appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

戏剧牡丹亭 2025-02-10 02:15:03

我认为重要的是,最好设计自己的功能,以获取单个输入并返回单个输出(在您的情况下),如果您有一个集合或行数据集,则可以将每一行映射到此功能,这就是为什么制作所有这些多态性功能(地图,flatmap,fold等)的原因,对吗?因此,您可以实现一种方法,该方法接收addressRawdata并返回addressData

def singleAddressParser(unparsedAddress: AddressRawData): AddressData = {
  val split = unparsedAddress.address.split(", ")
  AddressData(
    addressId = unparsedAddress.addressId,
    address = unparsedAddress.address,
    number = Some(split(0).toInt),
    road = Some(split(1)),
    city = Some(split(2)),
    country = Some(split(3))
  )
}

然后将每个原始数据映射到此函数:

import org.apache.spark.sql.Dataset

val addressDS: Dataset[AddressData] = 
  addressDF.as[AddressRawData].map(singleAddressParser)

这是预期的输出:

scala> addressDS.show(false)
+---------+-----------------------------------------+------+------------------+----------+-------+
|addressId|address                                  |number|road              |city      |country|
+---------+-----------------------------------------+------+------------------+----------+-------+
|ADD001   |384, East Avenue Street, New York, USA   |384   |East Avenue Street|New York  |USA    |
|ADD002   |123, Malccolm Street, Copenhagen, Denmark|123   |Malccolm Street   |Copenhagen|Denmark|
+---------+-----------------------------------------+------+------------------+----------+-------+

I think one important thing is that it's better to design your function to take a single input and return a single output (in your scenario), and if you have a collection or a dataset of rows, you can map each row to this function, that's why all these polymorphic functions (map, flatMap, fold, ...) are made, right? So you can implement a method, which receives an AddressRawData and returns an AddressData:

def singleAddressParser(unparsedAddress: AddressRawData): AddressData = {
  val split = unparsedAddress.address.split(", ")
  AddressData(
    addressId = unparsedAddress.addressId,
    address = unparsedAddress.address,
    number = Some(split(0).toInt),
    road = Some(split(1)),
    city = Some(split(2)),
    country = Some(split(3))
  )
}

And then map each raw data to this function:

import org.apache.spark.sql.Dataset

val addressDS: Dataset[AddressData] = 
  addressDF.as[AddressRawData].map(singleAddressParser)

And this is the output, as expected:

scala> addressDS.show(false)
+---------+-----------------------------------------+------+------------------+----------+-------+
|addressId|address                                  |number|road              |city      |country|
+---------+-----------------------------------------+------+------------------+----------+-------+
|ADD001   |384, East Avenue Street, New York, USA   |384   |East Avenue Street|New York  |USA    |
|ADD002   |123, Malccolm Street, Copenhagen, Denmark|123   |Malccolm Street   |Copenhagen|Denmark|
+---------+-----------------------------------------+------+------------------+----------+-------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文