如何将所有列转换为Pyspark中的行?

发布于 2025-02-01 01:16:02 字数 2450 浏览 4 评论 0原文

我正在尝试将列转换为行并将其加载到数据库中。我的输入是JSON文件。

{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}

pyspark:

df = spark.read.option(“ multiline”,“ true”)。格式(“ json”)。加载(“ testfile.json”)


Schema:

root
 |-- 09087: struct (nullable = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- 090881: struct (nullable = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

data:data:

df.show()
+--------------------+--------------------+
|               09087|              090881|
+--------------------+--------------------+
|{[76573433, 22223...|{[76573443433, 22...|
+--------------------+--------------------+

output:output:

Name        values            values1           values2              values3
09087       76573433          7686548898        234523723            87765
09087       2222322323        33256768          64238793333333       9234689677
09087       768346865         09864324567       75478393333          46389333
090881      76573443433       768637676548898   23877644523723       87765
090881      22276762322323    3398776256768     64238867658793333333 46389333
090881      7683878746865     0986456834324567  754788776393333      9234689677

实际上我刚刚给了2个列是输入,但我有很多。我一直在尝试这种方法 - 有人可以帮助我。提前致谢。

I am trying to transpose the columns to rows and load it to the data base. My input is the Json file.

{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}

Pyspark:

df = spark.read.option("multiline", "true").format("json").load("testfile.json")


Schema:

root
 |-- 09087: struct (nullable = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- 090881: struct (nullable = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values1: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values2: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- values3: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

Data:

df.show()
+--------------------+--------------------+
|               09087|              090881|
+--------------------+--------------------+
|{[76573433, 22223...|{[76573443433, 22...|
+--------------------+--------------------+

OUTPUT:

Name        values            values1           values2              values3
09087       76573433          7686548898        234523723            87765
09087       2222322323        33256768          64238793333333       9234689677
09087       768346865         09864324567       75478393333          46389333
090881      76573443433       768637676548898   23877644523723       87765
090881      22276762322323    3398776256768     64238867658793333333 46389333
090881      7683878746865     0986456834324567  754788776393333      9234689677

Actually I just gave 2 columns as input but I have lot of them. I have been trying this- could someone please help me on this. Thanks in advance.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

谎言 2025-02-08 01:16:02

我的Scala解决方案的Pyspark翻译:

from pyspark.sql import 
rdd = spark.sparkContext.parallelize([("""{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}""" )])
df = spark.read.json(rdd)

df.select(\
 explode (\#explode array into rows
  array(\
   *[ struct(\# make a stuct from column name and values 
    lit( col_name ).alias("Name"),\
    col(col_name+".*")\
  ) for col_name in df.columns ])))\
.select(\
  col("col.Name").alias("Name"),\
  explode(\
   arrays_zip(\# make an array of structs from multiple arrays.  The name of the struct.column will be it's index in the orginal array.
    col("col.values"),\
    col("col.values1"),\
    col("col.values2"),\
    col("col.values3")\
   )\
  ).alias("columns")\
).select( col("Name"),col("columns.*")).show()#use '.*' syntax to change struct.column into table.column
+------+--------------+----------------+--------------------+----------+
|  Name|             0|               1|                   2|         3|
+------+--------------+----------------+--------------------+----------+
| 09087|      76573433|      7686548898|           234523723|     87765|
| 09087|    2222322323|        33256768|      64238793333333|  46389333|
| 09087|     768346865|     09864324567|         75478393333|9234689677|
|090881|   76573443433| 768637676548898|      23877644523723|     87765|
|090881|22276762322323|   3398776256768|64238867658793333333|  46389333|
|090881| 7683878746865|0986456834324567|     754788776393333|9234689677|
+------+--------------+----------------+--------------------+----------+

Pyspark translation of my scala solution:

from pyspark.sql import 
rdd = spark.sparkContext.parallelize([("""{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}""" )])
df = spark.read.json(rdd)

df.select(\
 explode (\#explode array into rows
  array(\
   *[ struct(\# make a stuct from column name and values 
    lit( col_name ).alias("Name"),\
    col(col_name+".*")\
  ) for col_name in df.columns ])))\
.select(\
  col("col.Name").alias("Name"),\
  explode(\
   arrays_zip(\# make an array of structs from multiple arrays.  The name of the struct.column will be it's index in the orginal array.
    col("col.values"),\
    col("col.values1"),\
    col("col.values2"),\
    col("col.values3")\
   )\
  ).alias("columns")\
).select( col("Name"),col("columns.*")).show()#use '.*' syntax to change struct.column into table.column
+------+--------------+----------------+--------------------+----------+
|  Name|             0|               1|                   2|         3|
+------+--------------+----------------+--------------------+----------+
| 09087|      76573433|      7686548898|           234523723|     87765|
| 09087|    2222322323|        33256768|      64238793333333|  46389333|
| 09087|     768346865|     09864324567|         75478393333|9234689677|
|090881|   76573443433| 768637676548898|      23877644523723|     87765|
|090881|22276762322323|   3398776256768|64238867658793333333|  46389333|
|090881| 7683878746865|0986456834324567|     754788776393333|9234689677|
+------+--------------+----------------+--------------------+----------+
初见终念 2025-02-08 01:16:02
//make dummy data

val df = spark.sqlContext.read.json(res4)
val rdd = spark.sparkContext.parallelize(Seq(("""{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}""" )))
val df = spark.sqlContext.read.json(rdd)

df.select( 
 explode ( // explode an array into rows
  array( // make an array
   (for( col_name <- df.columns ) 
   yield 
    struct(  //create struct with names that can be use as columns
     lit(s"$col_name").as("Name") , 
     col(s"$col_name.*")
    )
   ).toSeq :_*  // make sequence into VarArgs
  ).as("rows") 
 ) 
).select( 
  col("col.Name"), 
  expr("explode(
          arrays_zip( 
           col.values , 
           col.values1, 
           col.values2, 
           col.values3)) "  //use array_zip to suck together multiple identical length arrays into 1 array(of structs) with struct containing the names column of the index.
  ).as("columns") 
).select( 
  col("Name"), 
  col("columns.*") // rename as required.
).show()
+------+--------------+----------------+--------------------+----------+
|  Name|             0|               1|                   2|         3|
+------+--------------+----------------+--------------------+----------+
| 09087|      76573433|      7686548898|           234523723|     87765|
| 09087|    2222322323|        33256768|      64238793333333|  46389333|
| 09087|     768346865|     09864324567|         75478393333|9234689677|
|090881|   76573443433| 768637676548898|      23877644523723|     87765|
|090881|22276762322323|   3398776256768|64238867658793333333|  46389333|
|090881| 7683878746865|0986456834324567|     754788776393333|9234689677|
+------+--------------+----------------+--------------------+----------+

有关

//make dummy data

val df = spark.sqlContext.read.json(res4)
val rdd = spark.sparkContext.parallelize(Seq(("""{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}""" )))
val df = spark.sqlContext.read.json(rdd)

df.select( 
 explode ( // explode an array into rows
  array( // make an array
   (for( col_name <- df.columns ) 
   yield 
    struct(  //create struct with names that can be use as columns
     lit(s"$col_name").as("Name") , 
     col(s"$col_name.*")
    )
   ).toSeq :_*  // make sequence into VarArgs
  ).as("rows") 
 ) 
).select( 
  col("col.Name"), 
  expr("explode(
          arrays_zip( 
           col.values , 
           col.values1, 
           col.values2, 
           col.values3)) "  //use array_zip to suck together multiple identical length arrays into 1 array(of structs) with struct containing the names column of the index.
  ).as("columns") 
).select( 
  col("Name"), 
  col("columns.*") // rename as required.
).show()
+------+--------------+----------------+--------------------+----------+
|  Name|             0|               1|                   2|         3|
+------+--------------+----------------+--------------------+----------+
| 09087|      76573433|      7686548898|           234523723|     87765|
| 09087|    2222322323|        33256768|      64238793333333|  46389333|
| 09087|     768346865|     09864324567|         75478393333|9234689677|
|090881|   76573443433| 768637676548898|      23877644523723|     87765|
|090881|22276762322323|   3398776256768|64238867658793333333|  46389333|
|090881| 7683878746865|0986456834324567|     754788776393333|9234689677|
+------+--------------+----------------+--------------------+----------+

for more info on arrays_zip see here.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文