如何使用Pyspark平坦嵌套结构?

发布于 2025-01-28 15:18:29 字数 293 浏览 4 评论 0原文

如何使用Pyspark平坦嵌套结构?

链接到数据集

提前致谢。

How to flatten nested struct using PySpark?

Link to dataset
https://drive.google.com/file/d/1-xOpd2B7MDgS1t4ekfipHSerIm6JMz9e/view?usp=sharing

enter image description here
Thanks in advance.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

別甾虛僞 2025-02-04 15:18:29

即如果您还没有弄清楚它,那么将DF弄平是非常基本的

def flattenNestedData(nestedDF):
  from pyspark.sql.functions import col
  from pyspark.sql.types import StructType,ArrayType
  try:
       ##Fetching Complex Datatype Columns from Schema
       fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType]) 
       while len(fieldNames)!=0:
         fieldName=list(fieldNames.keys())[0]
         print ("Processing :"+fieldName+" Type : "+str(type(fieldNames[fieldName])))
         if type(fieldNames[fieldName]) == StructType:
           extractedFields = [col(fieldName +'.'+ innerColName).alias(fieldName+"_"+innerColName) for innerColName in [ colName.name for colName in fieldNames[fieldName]]]
           nestedDF=nestedDF.select("*", *extractedFields).drop(fieldName)
    
         elif type(fieldNames[fieldName]) == ArrayType: 
           nestedDF=nestedDF.withColumn(fieldName,explode_outer(fieldName))
    
         fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])   
         
       return nestedDF

      
  except Exception as err:
    raise Exception("Error Occured at while flattening the dataframe : " + str(err))

虽然我同意幻影,

While I agree with Phantoms that it is very basic to flatten a df still if you still haven't figured it out you can use below function to flatten your df

def flattenNestedData(nestedDF):
  from pyspark.sql.functions import col
  from pyspark.sql.types import StructType,ArrayType
  try:
       ##Fetching Complex Datatype Columns from Schema
       fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType]) 
       while len(fieldNames)!=0:
         fieldName=list(fieldNames.keys())[0]
         print ("Processing :"+fieldName+" Type : "+str(type(fieldNames[fieldName])))
         if type(fieldNames[fieldName]) == StructType:
           extractedFields = [col(fieldName +'.'+ innerColName).alias(fieldName+"_"+innerColName) for innerColName in [ colName.name for colName in fieldNames[fieldName]]]
           nestedDF=nestedDF.select("*", *extractedFields).drop(fieldName)
    
         elif type(fieldNames[fieldName]) == ArrayType: 
           nestedDF=nestedDF.withColumn(fieldName,explode_outer(fieldName))
    
         fieldNames = dict([(field.name, field.dataType) for field in nestedDF.schema.fields if type(field.dataType) == ArrayType or type(field.dataType) == StructType])   
         
       return nestedDF

      
  except Exception as err:
    raise Exception("Error Occured at while flattening the dataframe : " + str(err))

You can remove Arraytype check if you don't want to explode those

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文