将每行在火花数据框架中写入单独的JSON

发布于 2025-02-02 21:00:35 字数 518 浏览 4 评论 0原文

我有一个相当大的数据框架(百万行),其要求是将每个行存储在单独的JSON文件中。

对于此数据框架,

 root
 |-- uniqueID: string 
 |-- moreData: array 

应像以下所有行一样存储输出。

s3://.../folder[i]/<uniqueID>.json

我是唯一ID的第一个字母,

我已经查看了其他问题和解决方案,但它们不满足我的要求。 试图以更多时间优化的方式进行此操作,从我到目前为止的重新分区来看,这不是一个不错的选择。

尝试使用maxRecordSperfile选项编写DF,但我似乎无法控制文件的命名。

df.write.mode("overwrite")
.option("maxRecordsPerFile", 1)
.json(outputPath)

我很陌生,可以引发任何帮助。

I have a fairly large dataframe(million rows), and the requirement is to store each of the row in a separate json file.

For this data frame

 root
 |-- uniqueID: string 
 |-- moreData: array 

The output should be stored like below for all the rows.

s3://.../folder[i]/<uniqueID>.json

where i is the first letter of the uniqueID

I have looked at other questions and solutions, but they don't satisfy my requirements.
Trying to do this in a more time optimized way, and from what I have read so far re-partition is not a good option.

Tried writing the df with maxRecordsPerFile option, but I can't seem to control the naming of the files.

df.write.mode("overwrite")
.option("maxRecordsPerFile", 1)
.json(outputPath)

I am fairly new to spark, any help is much appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不即不离 2025-02-09 21:00:37

我认为这样做的方法没有真正的优化(如果我们认为这比其他任何其他方法都要快得多”)。从根本上讲,这是一个效率低下的操作,而且我真的看不到一个很好的用例。但是,假设您真的已经考虑过并决定这是解决手头问题的最佳方法,我建议您在数据框架上使用repartition方法重新考虑;它可以用作列用作分区表达式。它唯一不会做的就是按照您想要的方式将文件分开。

我想这样的事情可能会起作用:

import java.io.File
import scala.reflect.io.Directory

// dummy data
val df = Seq(("A", "B", "XC"), ("D", "E", "YF"), ("G", "H", "ZI"), ("J", "K", "ZL"), ("M", "N", "XO")).toDF("FOO", "BAR", "BAZ")

// List of all possible prefixes for the index column. If you need to generate this
// from the data, replace this with a query against the input dataframe to do that.
val prefixes = List("X", "Y", "Z")

// replace with your path
val path = "/.../data"

prefixes.foreach{p =>
  val data = df.filter(col("uniqueID").startsWith(p))
  val path = new Directory(new File(f"$path/$p"))
  data.repartition(data.count.toInt) // repartition the dataframe with 1 record per partition
  data.write.format("json").save(path)
}

以上不完全满足要求,因为您无法控制输出文件名 1 。我们可以使用shell脚本来
之后修复文件名。这是假设您正在使用bashJQ的环境中运行。

#!/usr/bin/env bash

# replace with the path that contains the directories to process
cd /.../data

for sub_data_dir in ./*; do
  cd "${sub_data_dir}"
  rm _SUCCESS
  for f in ./part-*.json; do
    uuid="$(jq -r ."uniqueID" "${f}")"
    mv "${f}" "${uuid}"
  done
  cd ..
done

1:使用dataframe.write时,Spark并没有为您提供控制单个文件名的选项,因为这不是要使用的方式。预期的用法是在多节点的hadop群集上,其中可以在节点之间任意分布数据。 写入操作在所有节点之间进行协调,并针对共享HDFS上的路径。在这种情况下,谈论单个文件没有意义方法)

I don't think there is really an optimized (if we take that to mean "much faster than any other") method of doing this. It's fundamentally an inefficient operation, and one that I can't really see a good use case for. But, assuming you really have thought this through and decided this is the best way to solve the problem at hand, I would suggest you reconsider using the repartition method on the dataframe; it can take a column to be used as the partitioning expression. The only thing it won't do is split files across directories the way you want.

I suppose something like this might work:

import java.io.File
import scala.reflect.io.Directory

// dummy data
val df = Seq(("A", "B", "XC"), ("D", "E", "YF"), ("G", "H", "ZI"), ("J", "K", "ZL"), ("M", "N", "XO")).toDF("FOO", "BAR", "BAZ")

// List of all possible prefixes for the index column. If you need to generate this
// from the data, replace this with a query against the input dataframe to do that.
val prefixes = List("X", "Y", "Z")

// replace with your path
val path = "/.../data"

prefixes.foreach{p =>
  val data = df.filter(col("uniqueID").startsWith(p))
  val path = new Directory(new File(f"$path/$p"))
  data.repartition(data.count.toInt) // repartition the dataframe with 1 record per partition
  data.write.format("json").save(path)
}

The above doesn't quite meet the requirement since you can't control the output file name1. We can use a shell script to
fix the file names afterward. This assumes you are running in an environment with bash and jq available.

#!/usr/bin/env bash

# replace with the path that contains the directories to process
cd /.../data

for sub_data_dir in ./*; do
  cd "${sub_data_dir}"
  rm _SUCCESS
  for f in ./part-*.json; do
    uuid="$(jq -r ."uniqueID" "${f}")"
    mv "${f}" "${uuid}"
  done
  cd ..
done

1: Spark doesnt give you an option to control individual file names when using Dataframe.write because that isn't how it is meant to be used. The intended usage is on a multi-node Hadoop cluster where data may be distributed arbitrarily between the nodes. The write operation is coordinated among all nodes and targets a path on the shared HDFS. In that case it makes no sense to talk about individual files because the operation is performed on the dataframe level, and so you can only control the naming of the directory where the output files will be written (as the argument to the save method)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文