如何在作业完成之前在hadoop中重新运行整个map/reduce?

发布于 2024-11-02 05:00:05 字数 144 浏览 8 评论 0原文

我使用 Hadoop Map/Reduce 使用 Java

假设,我已经完成了整个 Map/Reduce 工作。有什么方法可以重复整个映射/减少部分,而不结束工作。我的意思是,我不想使用任何不同作业的链接,而只想重复映射/减少部分。

谢谢你!

I using Hadoop Map/Reduce using Java

Suppose, I have completed a whole map/reduce job. Is there any way I could repeat the whole map/reduce part only, without ending the job. I mean, I DON'T want to use any chaining of the different jobs but only only want the map/reduce part to repeat.

Thank you!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

黑寡妇 2024-11-09 05:00:05

所以我更熟悉 hadoop 流 API,但方法应该转换为本机 API。

根据我的理解,您想要做的就是对输入数据运行相同的 map() 和 reduce() 操作的多次迭代。

假设您的初始map() 输入数据来自文件input.txt,输出文件是output + {iteration}.txt(其中迭代是循环计数,迭代=[0, 迭代次数))。
在第二次调用map()/reduce()时,您的输入文件是output+{iteration},输出文件将变成output+{iteration +1}.txt。

如果不清楚,请告诉我,我可以想出一个简单的示例并在此处发布链接。

编辑*因此,对于Java,我修改了hadoop wordcount示例以运行多次

package com.rorlig;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountJob {
  public static class TokenizerMapper 
     extends Mapper<Object, Text, Text, IntWritable>{

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
 }
}

public static class IntSumReducer 
   extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, 
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
  }
}

public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();

if (args.length != 3) {
  System.err.println("Usage: wordcount <in> <out> <iterations>");
  System.exit(2);
}
int iterations = new Integer(args[2]);
Path inPath = new Path(args[0]);
Path outPath =  null;
for (int i = 0; i<iterations; ++i){
    outPath = new Path(args[1]+i);
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCountJob.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, inPath);
    FileOutputFormat.setOutputPath(job, outPath);
    job.waitForCompletion(true);
    inPath = outPath;
   }
 }
}

希望这有帮助

So I am more familiar with hadoop streaming APIs but approach should translate to the native APIs.

In my understanding what you are trying to do is run the several iterations of same map() and reduce() operations on the input data.

Lets say your initial map() input data comes from file input.txt and the output file is output + {iteration}.txt (where iteration is loop count, iteration =[0, # of iteration)).
In the second invocation of the map()/reduce() your input file is output+{iteration} and output file would become output+{iteration +1}.txt.

Let me know if this is not clear, I can conjure up a quick example and post a link here.

EDIT* So for Java I modified the hadoop wordcount example to run multiple times

package com.rorlig;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountJob {
  public static class TokenizerMapper 
     extends Mapper<Object, Text, Text, IntWritable>{

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
 }
}

public static class IntSumReducer 
   extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, 
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
  }
}

public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();

if (args.length != 3) {
  System.err.println("Usage: wordcount <in> <out> <iterations>");
  System.exit(2);
}
int iterations = new Integer(args[2]);
Path inPath = new Path(args[0]);
Path outPath =  null;
for (int i = 0; i<iterations; ++i){
    outPath = new Path(args[1]+i);
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCountJob.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, inPath);
    FileOutputFormat.setOutputPath(job, outPath);
    job.waitForCompletion(true);
    inPath = outPath;
   }
 }
}

Hope this helps

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文