如何再次覆盖/重用 Hadoop 作业的现有输出路径
当我每天运行 Hadoop 作业时,我想覆盖/重用现有的输出目录。 实际上输出目录将存储每天作业运行结果的汇总输出。 如果我指定相同的输出目录,则会出现错误“输出目录已存在”。
如何绕过这个验证?
I want to overwrite/reuse the existing output directory when I run my Hadoop job daily.
Actually the output directory will store summarized output of each day's job run results.
If I specify the same output directory it gives the error "output directory already exists".
How to bypass this validation?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(10)
在运行作业之前删除目录怎么样?
执行此操作:
您可以通过 shell或通过 Java API
What about deleting the directory before you run the job?
You can do this via shell:
or via the Java API:
Jungblut 的答案是您的直接解决方案。由于我从不相信自动化流程会删除内容(我个人),因此我会建议一种替代方案:
我建议您不要尝试覆盖,而是使作业的输出名称动态化,包括它运行的时间。
类似于“
/path/to/your/output-2011-10-09-23-04/
”。这样,您就可以保留旧的作业输出,以防您需要重新访问。在我的系统中,每天运行 10 多个作业,我们将输出结构为:/output/job1/2011/10/09 /job1out/part-r-xxxxx
、/output/job1/2011/10/10/job1out/part-r-xxxxx
等Jungblut's answer is your direct solution. Since I never trust automated processes to delete stuff (me personally), I'll suggest an alternative:
Instead of trying to overwrite, I suggest you make the output name of your job dynamic, including the time in which it ran.
Something like "
/path/to/your/output-2011-10-09-23-04/
". This way you can keep around your old job output in case you ever need to revisit in. In my system, which runs 10+ daily jobs, we structure the output to be:/output/job1/2011/10/09/job1out/part-r-xxxxx
,/output/job1/2011/10/10/job1out/part-r-xxxxx
, etc.Hadoop 的 TextInputFormat (我猜您正在使用它)不允许覆盖现有目录。可能是为了原谅你发现自己错误地删除了你(和你的集群)努力工作的东西所带来的痛苦。
但是,如果您确定希望输出文件夹被作业覆盖,我相信最干净的方法是像这样更改
TextOutputFormat
:现在您正在创建
FSDataOutputStream
code> (fs.create(file, true)
) 并覆盖=true。Hadoop's
TextInputFormat
(which I guess you are using) does not allow overwriting an existing directory. Probably to excuse you the pain of finding out you mistakenly deleted something you (and your cluster) worked very hard on.However, If you are certain you want your output folder to be overwritten by the job, I believe the cleanest way is to change
TextOutputFormat
a little like this:Now you are creating the
FSDataOutputStream
(fs.create(file, true)
) with overwrite=true.Hadoop 已经支持您试图通过允许作业的多个输入路径来实现的效果。不要尝试使用单个文件目录来添加更多文件,而是使用一个目录来添加新目录。要将聚合结果用作输入,只需将输入 glob 指定为子目录上的通配符(例如,
my-aggregate-output/*
)。要将新数据“追加”到聚合作为输出,只需指定聚合的一个新的唯一子目录作为输出目录,通常使用从输入数据派生的时间戳或某些序列号(例如 my-aggregate-output/ 20140415154424)。Hadoop already supports the effect you seem to be trying to achieve by allowing multiple input paths to a job. Instead of trying to have a single directory of files to which you add more files, have a directory of directories to which you add new directories. To use the aggregate result as input, simply specify the input glob as a wildcard over the subdirectories (e.g.,
my-aggregate-output/*
). To "append" new data to the aggregate as output, simply specify a new unique subdirectory of the aggregate as the output directory, generally using a timestamp or some sequence number derived from your input data (e.g.my-aggregate-output/20140415154424
).如果将输入文件(例如带有附加条目)从本地文件系统加载到 hadoop 分布式文件系统,如下所示:
那么还可以使用
-f
覆盖/重用现有的输出目录。无需删除或重新创建文件夹If one is loading the input file (with e.g., appended entries) from the local file system to hadoop distributed file system as such:
Then one could also overwrite/reuse the existing output directory with
-f
. No need to delete or re-create the folderHadoop 遵循“一次写入,多次读取”的理念。因此,当您尝试再次写入目录时,它会假设必须创建一个新目录(写入一次),但它已经存在,因此它抱怨。您可以通过hadoop fs -rmr /path/to/your/output/删除它。最好创建一个动态目录(例如,基于时间戳或哈希值)以保存数据。
Hadoop follows the philosophy Write Once, Read Many times. Thus when you try to write to the directory again, it assumes it has to make a new one (Write once) but it already exists, and so it complains. You can delete it via
hadoop fs -rmr /path/to/your/output/
. It's better to create a dynamic directory (eg,based on timestamp or hash value) in order to preserve data.您可以按时间为每次执行创建一个输出子目录。例如,假设您希望来自用户的输出目录,然后将其设置如下:
通过以下行更改此设置:
You can create an output subdirectory for each execution by time. For example lets say you are expecting output directory from user and then set it as follows:
Change this by the following lines:
我有一个类似的用例,我使用 MultipleOutputs 来解决这个问题。
例如,如果我希望不同的 MapReduce 作业写入同一目录
/outputDir/
。作业 1 写入/outputDir/job1-part1.txt
,作业 2 写入/outputDir/job1-part2.txt
(不删除现有文件)。在 main 中,将输出目录设置为随机目录(可以在新作业运行之前将其删除)。
在 reducer/mapper 中,使用 MultipleOutputs 并将 writer 设置为写入所需的目录
: :
但是,我的用例比这有点复杂。如果只是写入同一个平面目录,您可以写入不同的目录并运行脚本来迁移文件,例如:
hadoop fs -mv /tmp/* /outputDir
在我的用例中,每个 MapReduce 作业根据正在写入的消息的值写入不同的子目录。目录结构可以是多层的,例如:
每个 Mapreduce 作业可以写入数千个子目录。写入 tmp 目录并将每个文件移动到正确目录的成本很高。
I had a similar use case, I use
MultipleOutputs
to resolve this.For example, if I want different MapReduce jobs to write to the same directory
/outputDir/
. Job 1 writes to/outputDir/job1-part1.txt
, job 2 writes to/outputDir/job1-part2.txt
(without deleting exiting files).In the main, set the output directory to a random one (it can be deleted before a new job runs)
In the reducer/mapper, use
MultipleOutputs
and set the writer to write to the desired directory:and:
However, my use case was a bit complicated than that. If it's just to write to the same flat directory, you can write to a different directory and runs a script to migrate the files, like:
hadoop fs -mv /tmp/* /outputDir
In my use case, each MapReduce job writes to different sub-directories based on the value of the message being writing. The directory structure can be multi-layered like:
Each Mapreduce job can write to thousands of sub-directories. And the cost of writing to a tmp dir and moving each files to the correct directory is high.
我遇到了这个确切的问题,它源于类
FileOutputFormat
中的checkOutputSpecs
中引发的异常。就我而言,我希望有许多作业将文件添加到已经存在的目录中,并且我保证这些文件具有唯一的名称。我通过创建一个输出格式类来解决这个问题,该类仅覆盖
checkOutputSpecs
方法,并抑制(忽略)在检查目录是否已存在时抛出的FileAlreadyExistsException
。在作业配置中,我使用了 LazyOutputFormat 和 MultipleOutputs。
I encountered this exact problem, it stems from the exception raised in
checkOutputSpecs
in the classFileOutputFormat
. In my case, I wanted to have many jobs adding files to directories that already exist and I guaranteed that the files would have unique names.I solved it by creating an output format class which overrides only the
checkOutputSpecs
method and suffocates (ignores) theFileAlreadyExistsException
that's thrown where it checks if the directory already exists.And the in the job configuration, I used
LazyOutputFormat
and alsoMultipleOutputs
.您需要在主类中添加设置:
you need to add the setting in your main class: