使用枚举以获取来自DataFrame的分区列

发布于 2025-01-24 13:32:06 字数 2281 浏览 2 评论 0原文

我试图将所有列及其数据类型放入一个变量中,也只有将分区列纳入Python列表类型的另一个变量。

从描述扩展中获取细节。

df = spark.sql("describe extended schema_name.table_name")

    +----------------------------------------------------------+
    |col_name                    |data_type                                                                       |
    +----------------------------+-----------------------------+
    |col1                        |string                                                                          |
    |col2                        |int                                                                      
    |col3                        |string   
    |col4                        |int                                                                      
    |col5                        |string   
    |# Partition Information     |                                                                                |
    |# col_name                  |data_type                                                                       |
    |col4                        |int                                                                          |
    |col5                        |string                                                                          |
    |                            |                                                                                |
    |# Detailed Table Information|                                                                                |
    |Database                    |schema_name                                                                         |
    |Table                       |table_name                                                                        |
    |Owner                       |owner.name                                                                  |

将结果转换为列表。

des_list=df.select(df.col_name,df.data_type).rdd.map(lambda x:(x[0],x[1])).collect()

这是我尝试获取所有列(直到#分区信息之前的所有项目)的方式。

  all_cols_name_type=[]
    for index,item in enumerate(des_list):
        if item[0]=='# Partition Information':
            all_cols_name_type.append(des_list[:index])

    

对于分区,我想在项目“#col_name”和行之间获得所有内容

I am trying to get all columns and their datatypes into a variable, also only the partition columns into another variable of list type in python.

Getting details from describe extended.

df = spark.sql("describe extended schema_name.table_name")

    +----------------------------------------------------------+
    |col_name                    |data_type                                                                       |
    +----------------------------+-----------------------------+
    |col1                        |string                                                                          |
    |col2                        |int                                                                      
    |col3                        |string   
    |col4                        |int                                                                      
    |col5                        |string   
    |# Partition Information     |                                                                                |
    |# col_name                  |data_type                                                                       |
    |col4                        |int                                                                          |
    |col5                        |string                                                                          |
    |                            |                                                                                |
    |# Detailed Table Information|                                                                                |
    |Database                    |schema_name                                                                         |
    |Table                       |table_name                                                                        |
    |Owner                       |owner.name                                                                  |

Converting result into a list.

des_list=df.select(df.col_name,df.data_type).rdd.map(lambda x:(x[0],x[1])).collect()

Here is how I am trying to get all columns(all items until before # Partition Information).

  all_cols_name_type=[]
    for index,item in enumerate(des_list):
        if item[0]=='# Partition Information':
            all_cols_name_type.append(des_list[:index])

    

For partitions, i would like to get everything between the items '# col_name' and line before ''(line before # Detailed Table Information)

Any help is appreciated to be able to get this.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

黎夕旧梦 2025-01-31 13:32:06

您可以尝试使用以下答案或在Scala中等效:

    val (partitionCols, dataCols) = spark.catalog.listColumns("schema_name.table_name")
      .collect()
      .partition(c => c.isPartition)

    val parCols = partitionCols.map(c => (c.name, c.dataType))
    val datCols = dataCols.map(c => (c.name, c.dataType))

如果表格未定义(EG Reading Parquet Dataset中)直接使用spark.read.parquet(“ s3:// path/...”)),然后您可以在scala中使用以下片段:

    val (partitionSchema, dataSchema) = df.queryExecution.optimizedPlan match {
      case LogicalRelation(hfs: HadoopFsRelation, _, _, _) =>
       (hfs.partitionSchema, hfs.dataSchema)
      case DataSourceV2ScanRelation(_, scan: FileScan, _) =>
        (scan.readPartitionSchema, scan.readDataSchema)
      case _ => (StructType(Seq()), StructType(Seq()))
    }

    val parCols = partitionSchema.map(f => (f.name, f.dataType))
    val datCols = dataSchema.map(f => (f.name, f.dataType))

You can try using the following answer or equivalent in Scala:

    val (partitionCols, dataCols) = spark.catalog.listColumns("schema_name.table_name")
      .collect()
      .partition(c => c.isPartition)

    val parCols = partitionCols.map(c => (c.name, c.dataType))
    val datCols = dataCols.map(c => (c.name, c.dataType))

If the table is not defined in the catalog (e.g reading parquet dataset directly from s3 using spark.read.parquet("s3://path/...")) then you can use the following snippet in Scala:

    val (partitionSchema, dataSchema) = df.queryExecution.optimizedPlan match {
      case LogicalRelation(hfs: HadoopFsRelation, _, _, _) =>
       (hfs.partitionSchema, hfs.dataSchema)
      case DataSourceV2ScanRelation(_, scan: FileScan, _) =>
        (scan.readPartitionSchema, scan.readDataSchema)
      case _ => (StructType(Seq()), StructType(Seq()))
    }

    val parCols = partitionSchema.map(f => (f.name, f.dataType))
    val datCols = dataSchema.map(f => (f.name, f.dataType))
云淡风轻 2025-01-31 13:32:06

这样做有一个技巧:您可以使用 nootonalyy_increasing_id 要给每一行,找到具有#col_name的行,然后获取该索引。像这样的东西

我的示例表
df = spark.sql('describe data')
df = df.withColumn('id', F.monotonically_increasing_id())
df.show()

+--------------------+---------+-------+---+
|            col_name|data_type|comment| id|
+--------------------+---------+-------+---+
|                  c1|      int|   null|  0|
|                  c2|   string|   null|  1|
|# Partition Infor...|         |       |  2|
|          # col_name|data_type|comment|  3|
|                  c2|   string|   null|  4|
+--------------------+---------+-------+---+
很棘手的部分
idx = df.where(F.col('col_name') == '# col_name').first()['id']
# 3

partition_cols = [r['col_name'] for r in df.where(F.col('id') > idx).collect()]
# ['c2']

There is a trick to do so: You can use monotonically_increasing_id to give each row a number, find the row that has # col_name and get that index. Something like this

My sample table
df = spark.sql('describe data')
df = df.withColumn('id', F.monotonically_increasing_id())
df.show()

+--------------------+---------+-------+---+
|            col_name|data_type|comment| id|
+--------------------+---------+-------+---+
|                  c1|      int|   null|  0|
|                  c2|   string|   null|  1|
|# Partition Infor...|         |       |  2|
|          # col_name|data_type|comment|  3|
|                  c2|   string|   null|  4|
+--------------------+---------+-------+---+
tricky part
idx = df.where(F.col('col_name') == '# col_name').first()['id']
# 3

partition_cols = [r['col_name'] for r in df.where(F.col('id') > idx).collect()]
# ['c2']
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文