在Scala中将数据帧分区编写为自定义目录

发布于 2025-02-01 11:24:04 字数 833 浏览 4 评论 0原文

我有一个具有数百万个记录的数据帧,需要使用胶水作业将数据分配到小于200MB或200,000行的S3存储夹文件夹中。使用partitionby无法正常工作,因为没有列值以使分区低于我们所需的某些下游过程所需的大小的方式分配数据。我尝试根据预定义的ID范围添加单调增加的ID和写作,但由于单调_increasing_id不是连续的,但这行不通。我如何获得胶水工作以将分区数据写入小于200MB的S3文件夹,或者有一种方法可以通过重新分配的数据帧进行分区

    val newdf = diffDF.withColumn("id", monotonically_increasing_id())                    
    var batchSize = 100000
    var totalRecordCount = diffDF.count()
    var currentRow = 0        
         while(currentRow < totalRecordCount){
             var segmentDF = newdf.where(col("id") >= currentRow and col("id") < (currentRow + batchSize ))
                                   .drop("id")
             segmentDF.write.option("header","true").mode(SaveMode.Overwrite).csv(tpath + "/" + currentRow)
             currentRow = currentRow + batchSize
             }  

I have a dataframe with millions of records and need to partition the data into s3 bucket folders less than 200MB or 200,000 rows using a glue job. Using partitionBy won't work because there is no column value that splits the data in a way that keeps the partitions below the size we need for some downstream processes. I tried adding monotonically increasing id and writing based on a predefined range of id but that won't work because monotonically_increasing_id is not consecutive. How do I get a glue job to write partitioned data into s3 folders of less than 200mb or is there a way to partitionBy a repartitioned dataframe

    val newdf = diffDF.withColumn("id", monotonically_increasing_id())                    
    var batchSize = 100000
    var totalRecordCount = diffDF.count()
    var currentRow = 0        
         while(currentRow < totalRecordCount){
             var segmentDF = newdf.where(col("id") >= currentRow and col("id") < (currentRow + batchSize ))
                                   .drop("id")
             segmentDF.write.option("header","true").mode(SaveMode.Overwrite).csv(tpath + "/" + currentRow)
             currentRow = currentRow + batchSize
             }  

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

这是一种使用折叠的Scala-ish解决方案,我尝试将相同的逻辑调整为Spark,而Spark RDD现在拥有的最相似的东西是rdd.aggregate,该 combineop combineop combineop中参数列表只是毁了一切!因此,如果您使用rdd s感到舒适,则这种方法或类似的Spark对您有用:

val rdd = df.rdd
rdd.collect().foldLeft(List.empty[List[Row]]) {
    case (l@(headAggregator :: tail), newRow) =>
      // this if represents rdd size, so instead of list.length you can capture rdd size
      if ((newRow :: headAggregator).length < 3) (newRow :: headAggregator)  :: tail
      else (newRow :: Nil) :: l
    case (Nil, newRow) =>
      (newRow :: Nil) :: Nil
  }

我知道,此rdd.Collect()实际上非常昂贵,但是我刚刚实现了逻辑,因此,如果您找到了类似于rdd s的foldleft的东西,只需复制并粘贴功能主体:)

This is a Scala-ish solution, that uses folding, I tried adapting the same logic into spark, and the most similar thing that spark rdd's have right now, is rdd.aggregate, which the combineOp in its parameter list just ruins everything! So if you feel comfortable using RDDs, this approach or something similar in spark would work for you:

val rdd = df.rdd
rdd.collect().foldLeft(List.empty[List[Row]]) {
    case (l@(headAggregator :: tail), newRow) =>
      // this if represents rdd size, so instead of list.length you can capture rdd size
      if ((newRow :: headAggregator).length < 3) (newRow :: headAggregator)  :: tail
      else (newRow :: Nil) :: l
    case (Nil, newRow) =>
      (newRow :: Nil) :: Nil
  }

I know, this rdd.collect() is very expensive actually, but I just implemented the logic, so if you found something similar to foldLeft for RDDs, just copy and paste the function body :)

硪扪都還晓 2025-02-08 11:24:04

我最终做的是添加一列,该列是ID值的剩余部分。

val diffDF = .withColumn("partitionnum", col("Employee_ID") % 9) .write.option("header","true").partitionBy("partitionnum").mode(SaveMode.Overwrite).csv(tpath)

这将提供9个分区,并且可以自定义。您可以将5个分区等除以5

What I ended up doing was adding a column that was a remainder of the division of the id values.

val diffDF = .withColumn("partitionnum", col("Employee_ID") % 9) .write.option("header","true").partitionBy("partitionnum").mode(SaveMode.Overwrite).csv(tpath)

This will give 9 partitions and is highly customizable. you can divide by 5 for 5 partitions etc

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文