在Scala中将数据帧分区编写为自定义目录
我有一个具有数百万个记录的数据帧,需要使用胶水作业将数据分配到小于200MB或200,000行的S3存储夹文件夹中。使用partitionby无法正常工作,因为没有列值以使分区低于我们所需的某些下游过程所需的大小的方式分配数据。我尝试根据预定义的ID范围添加单调增加的ID和写作,但由于单调_increasing_id不是连续的,但这行不通。我如何获得胶水工作以将分区数据写入小于200MB的S3文件夹,或者有一种方法可以通过重新分配的数据帧进行分区
val newdf = diffDF.withColumn("id", monotonically_increasing_id())
var batchSize = 100000
var totalRecordCount = diffDF.count()
var currentRow = 0
while(currentRow < totalRecordCount){
var segmentDF = newdf.where(col("id") >= currentRow and col("id") < (currentRow + batchSize ))
.drop("id")
segmentDF.write.option("header","true").mode(SaveMode.Overwrite).csv(tpath + "/" + currentRow)
currentRow = currentRow + batchSize
}
I have a dataframe with millions of records and need to partition the data into s3 bucket folders less than 200MB or 200,000 rows using a glue job. Using partitionBy won't work because there is no column value that splits the data in a way that keeps the partitions below the size we need for some downstream processes. I tried adding monotonically increasing id and writing based on a predefined range of id but that won't work because monotonically_increasing_id is not consecutive. How do I get a glue job to write partitioned data into s3 folders of less than 200mb or is there a way to partitionBy a repartitioned dataframe
val newdf = diffDF.withColumn("id", monotonically_increasing_id())
var batchSize = 100000
var totalRecordCount = diffDF.count()
var currentRow = 0
while(currentRow < totalRecordCount){
var segmentDF = newdf.where(col("id") >= currentRow and col("id") < (currentRow + batchSize ))
.drop("id")
segmentDF.write.option("header","true").mode(SaveMode.Overwrite).csv(tpath + "/" + currentRow)
currentRow = currentRow + batchSize
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
这是一种使用折叠的Scala-ish解决方案,我尝试将相同的逻辑调整为Spark,而Spark RDD现在拥有的最相似的东西是
rdd.aggregate
,该 combineop combineop combineop中参数列表只是毁了一切!因此,如果您使用rdd
s感到舒适,则这种方法或类似的Spark对您有用:我知道,此
rdd.Collect()
实际上非常昂贵,但是我刚刚实现了逻辑,因此,如果您找到了类似于rdd
s的foldleft的东西,只需复制并粘贴功能主体:)This is a Scala-ish solution, that uses folding, I tried adapting the same logic into spark, and the most similar thing that spark rdd's have right now, is
rdd.aggregate
, which the combineOp in its parameter list just ruins everything! So if you feel comfortable usingRDD
s, this approach or something similar in spark would work for you:I know, this
rdd.collect()
is very expensive actually, but I just implemented the logic, so if you found something similar to foldLeft forRDD
s, just copy and paste the function body :)我最终做的是添加一列,该列是ID值的剩余部分。
这将提供9个分区,并且可以自定义。您可以将5个分区等除以5
What I ended up doing was adding a column that was a remainder of the division of the id values.
This will give 9 partitions and is highly customizable. you can divide by 5 for 5 partitions etc