如何再次覆盖/重用 Hadoop 作业的现有输出路径

发布于 2024-12-09 10:49:35 字数 113 浏览 1 评论 0原文

当我每天运行 Hadoop 作业时,我想覆盖/重用现有的输出目录。 实际上输出目录将存储每天作业运行结果的汇总输出。 如果我指定相同的输出目录,则会出现错误“输出目录已存在”。

如何绕过这个验证?

I want to overwrite/reuse the existing output directory when I run my Hadoop job daily.
Actually the output directory will store summarized output of each day's job run results.
If I specify the same output directory it gives the error "output directory already exists".

How to bypass this validation?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(10

子栖 2024-12-16 10:49:35

在运行作业之前删除目录怎么样?

执行此操作:

hadoop fs -rmr /path/to/your/output/

您可以通过 shell或通过 Java API

// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);

What about deleting the directory before you run the job?

You can do this via shell:

hadoop fs -rmr /path/to/your/output/

or via the Java API:

// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
开始看清了 2024-12-16 10:49:35

Jungblut 的答案是您的直接解决方案。由于我从不相信自动化流程会删除内容(我个人),因此我会建议一种替代方案:

我建议您不要尝试覆盖,而是使作业的输出名称动态化,包括它运行的时间。

类似于“/path/to/your/output-2011-10-09-23-04/”。这样,您就可以保留旧的作业输出,以防您需要重新访问。在我的系统中,每天运行 10 多个作业,我们将输出结构为:/output/job1/2011/10/09 /job1out/part-r-xxxxx/output/job1/2011/10/10/job1out/part-r-xxxxx

Jungblut's answer is your direct solution. Since I never trust automated processes to delete stuff (me personally), I'll suggest an alternative:

Instead of trying to overwrite, I suggest you make the output name of your job dynamic, including the time in which it ran.

Something like "/path/to/your/output-2011-10-09-23-04/". This way you can keep around your old job output in case you ever need to revisit in. In my system, which runs 10+ daily jobs, we structure the output to be: /output/job1/2011/10/09/job1out/part-r-xxxxx, /output/job1/2011/10/10/job1out/part-r-xxxxx, etc.

萤火眠眠 2024-12-16 10:49:35

Hadoop 的 TextInputFormat (我猜您正在使用它)不允许覆盖现有目录。可能是为了原谅你发现自己错误地删除了你(和你的集群)努力工作的东西所带来的痛苦。

但是,如果您确定希望输出文件夹被作业覆盖,我相信最干净的方法是像这样更改 TextOutputFormat

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}

现在您正在创建 FSDataOutputStream code> (fs.create(file, true)) 并覆盖=true。

Hadoop's TextInputFormat (which I guess you are using) does not allow overwriting an existing directory. Probably to excuse you the pain of finding out you mistakenly deleted something you (and your cluster) worked very hard on.

However, If you are certain you want your output folder to be overwritten by the job, I believe the cleanest way is to change TextOutputFormat a little like this:

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}

Now you are creating the FSDataOutputStream (fs.create(file, true)) with overwrite=true.

淡水深流 2024-12-16 10:49:35

Hadoop 已经支持您试图通过允许作业的多个输入路径来实现的效果。不要尝试使用单个文件目录来添加更多文件,而是使用一个目录来添加新目录。要将聚合结果用作输入,只需将输入 glob 指定为子目录上的通配符(例如,my-aggregate-output/*)。要将新数据“追加”到聚合作为输出,只需指定聚合的一个新的唯一子目录作为输出目录,通常使用从输入数据派生的时间戳或某些序列号(例如 my-aggregate-output/ 20140415154424)。

Hadoop already supports the effect you seem to be trying to achieve by allowing multiple input paths to a job. Instead of trying to have a single directory of files to which you add more files, have a directory of directories to which you add new directories. To use the aggregate result as input, simply specify the input glob as a wildcard over the subdirectories (e.g., my-aggregate-output/*). To "append" new data to the aggregate as output, simply specify a new unique subdirectory of the aggregate as the output directory, generally using a timestamp or some sequence number derived from your input data (e.g. my-aggregate-output/20140415154424).

野の 2024-12-16 10:49:35

如果将输入文件(例如带有附加条目)从本地文件系统加载到 hadoop 分布式文件系统,如下所示:

hdfs dfs -put  /mylocalfile /user/cloudera/purchase

那么还可以使用 -f 覆盖/重用现有的输出目录。无需删除或重新创建文件夹

hdfs dfs -put -f  /updated_mylocalfile /user/cloudera/purchase

If one is loading the input file (with e.g., appended entries) from the local file system to hadoop distributed file system as such:

hdfs dfs -put  /mylocalfile /user/cloudera/purchase

Then one could also overwrite/reuse the existing output directory with -f. No need to delete or re-create the folder

hdfs dfs -put -f  /updated_mylocalfile /user/cloudera/purchase
溺深海 2024-12-16 10:49:35

Hadoop 遵循“一次写入,多次读取”的理念。因此,当您尝试再次写入目录时,它会假设必须创建一个新目录(写入一次),但它已经存在,因此它抱怨。您可以通过hadoop fs -rmr /path/to/your/output/删除它。最好创建一个动态目录(例如,基于时间戳或哈希值)以保存数据。

Hadoop follows the philosophy Write Once, Read Many times. Thus when you try to write to the directory again, it assumes it has to make a new one (Write once) but it already exists, and so it complains. You can delete it via hadoop fs -rmr /path/to/your/output/. It's better to create a dynamic directory (eg,based on timestamp or hash value) in order to preserve data.

梅倚清风 2024-12-16 10:49:35

您可以按时间为每次执行创建一个输出子目录。例如,假设您希望来自用户的输出目录,然后将其设置如下:

FileOutputFormat.setOutputPath(job, new Path(args[1]);

通过以下行更改此设置:

String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));

You can create an output subdirectory for each execution by time. For example lets say you are expecting output directory from user and then set it as follows:

FileOutputFormat.setOutputPath(job, new Path(args[1]);

Change this by the following lines:

String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
木森分化 2024-12-16 10:49:35

我有一个类似的用例,我使用 MultipleOutputs 来解决这个问题。

例如,如果我希望不同的 MapReduce 作业写入同一目录 /outputDir/。作业 1 写入 /outputDir/job1-part1.txt,作业 2 写入 /outputDir/job1-part2.txt(不删除现有文件)。

在 main 中,将输出目录设置为随机目录(可以在新作业运行之前将其删除)。

FileInputFormat.addInputPath(job, new Path("/randomPath"));

在 reducer/mapper 中,使用 MultipleOutputs 并将 writer 设置为写入所需的目录

public void setup(Context context) {
    MultipleOutputs mos = new MultipleOutputs(context);
}

: :

mos.write(key, value, "/outputDir/fileOfJobX.txt")

但是,我的用例比这有点复杂。如果只是写入同一个平面目录,您可以写入不同的目录并运行脚本来迁移文件,例如:hadoop fs -mv /tmp/* /outputDir

在我的用例中,每个 MapReduce 作业根据正在写入的消息的值写入不同的子目录。目录结构可以是多层的,例如:

/outputDir/
    messageTypeA/
        messageSubTypeA1/
            job1Output/
                job1-part1.txt
                job1-part2.txt
                ...
            job2Output/
                job2-part1.txt
                ...

        messageSubTypeA2/
        ...
    messageTypeB/
    ...

每个 Mapreduce 作业可以写入数千个子目录。写入 tmp 目录并将每个文件移动到正确目录的成本很高。

I had a similar use case, I use MultipleOutputs to resolve this.

For example, if I want different MapReduce jobs to write to the same directory /outputDir/. Job 1 writes to /outputDir/job1-part1.txt, job 2 writes to /outputDir/job1-part2.txt (without deleting exiting files).

In the main, set the output directory to a random one (it can be deleted before a new job runs)

FileInputFormat.addInputPath(job, new Path("/randomPath"));

In the reducer/mapper, use MultipleOutputs and set the writer to write to the desired directory:

public void setup(Context context) {
    MultipleOutputs mos = new MultipleOutputs(context);
}

and:

mos.write(key, value, "/outputDir/fileOfJobX.txt")

However, my use case was a bit complicated than that. If it's just to write to the same flat directory, you can write to a different directory and runs a script to migrate the files, like: hadoop fs -mv /tmp/* /outputDir

In my use case, each MapReduce job writes to different sub-directories based on the value of the message being writing. The directory structure can be multi-layered like:

/outputDir/
    messageTypeA/
        messageSubTypeA1/
            job1Output/
                job1-part1.txt
                job1-part2.txt
                ...
            job2Output/
                job2-part1.txt
                ...

        messageSubTypeA2/
        ...
    messageTypeB/
    ...

Each Mapreduce job can write to thousands of sub-directories. And the cost of writing to a tmp dir and moving each files to the correct directory is high.

伪心 2024-12-16 10:49:35

我遇到了这个确切的问题,它源于类 FileOutputFormat 中的 checkOutputSpecs 中引发的异常。就我而言,我希望有许多作业将文件添加到已经存在的目录中,并且我保证这些文件具有唯一的名称。

我通过创建一个输出格式类来解决这个问题,该类仅覆盖 checkOutputSpecs 方法,并抑制(忽略)在检查目录是否已存在时抛出的 FileAlreadyExistsException

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
    @Override
    public void checkOutputSpecs(JobContext job) throws IOException {
        try {
            super.checkOutputSpecs(job);
        }catch (FileAlreadyExistsException ignored){
            // Suffocate the exception
        }
    }
}

在作业配置中,我使用了 LazyOutputFormat 和 MultipleOutputs。

LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);

I encountered this exact problem, it stems from the exception raised in checkOutputSpecs in the class FileOutputFormat. In my case, I wanted to have many jobs adding files to directories that already exist and I guaranteed that the files would have unique names.

I solved it by creating an output format class which overrides only the checkOutputSpecs method and suffocates (ignores) the FileAlreadyExistsException that's thrown where it checks if the directory already exists.

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
    @Override
    public void checkOutputSpecs(JobContext job) throws IOException {
        try {
            super.checkOutputSpecs(job);
        }catch (FileAlreadyExistsException ignored){
            // Suffocate the exception
        }
    }
}

And the in the job configuration, I used LazyOutputFormat and also MultipleOutputs.

LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);
寒冷纷飞旳雪 2024-12-16 10:49:35

您需要在主类中添加设置:

//Configuring the output path from the filesystem into the job
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//auto_delete output dir
OutputPath.getFileSystem(conf).delete(OutputPath);

you need to add the setting in your main class:

//Configuring the output path from the filesystem into the job
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//auto_delete output dir
OutputPath.getFileSystem(conf).delete(OutputPath);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文