映射减少计数示例

发布于 2024-11-10 10:15:24 字数 2032 浏览 0 评论 0原文

我的问题是关于java中的mapreduce编程。

假设我有 WordCount.java 示例,一个标准的 mapreduce 程序。我希望map函数收集一些信息,并返回到reduce函数map,形成如下:

这样我可以知道哪个从节点收集了哪些数据代码>..知道怎么做吗?

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          output.collect(word, one);
        }
      }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
          sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf(WordCount.class);
      conf.setJobName("wordcount");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(Map.class);
      conf.setCombinerClass(Reduce.class);
      conf.setReducerClass(Reduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
    }
}

谢谢你!!

My question is about mapreduce programming in java.

Suppose I have the WordCount.java example, a standard mapreduce program. I want the map function to collect some information, and return to the reduce function maps formed like: <slaveNode_id,some_info_collected>,

so that I can know what slave node collected what data.. Any idea how??

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();

      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          output.collect(word, one);
        }
      }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
          sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
      }
    }

    public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf(WordCount.class);
      conf.setJobName("wordcount");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(Map.class);
      conf.setCombinerClass(Reduce.class);
      conf.setReducerClass(Reduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
    }
}

Thank you!!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

意中人 2024-11-17 10:15:24

您所要求的是让应用程序(您的地图缩减事物)了解它运行的基础设施。

一般来说,答案是您的应用程序不需要此信息。对Mapper 的每次调用和对Reducer 的每次调用都可以在不同的节点上执行,也可以全部在同一节点上执行。 MapReduce 的美妙之处在于结果是相同的,因此对于您的应用程序:这并不重要。

因此,API 没有功能来支持您的此请求。

祝你学习 Hadoop 愉快:)


PS 我能想到的唯一方法(至少可以说这是令人讨厌的)是在 Mapper 中包含某种系统调用,并向底层操作系统询问它的名称/属性/等。
这种构造将使您的应用程序非常不可移植;即它不能在 Windows 或 Amazon 的 Hadoop 上运行。

What you are asking is to let the application (your map-reduce thingy) know about the infrastructure it ran on.

In general the answer is that your application doesn't need this information. Each call to the Mapper and each call to the Reducer can be executed on a different node or all on the same node. The beauty of MapReduce is that the outcome is the same, so for your application: it doesn't matter.

As a consequence the API don't have features to support this request of yours.

Have fun learning Hadoop :)


P.S. The only way I can think of (which is nasty to say the least) is that you include a system call of some sort in the Mapper and ask the underlying OS about it's name/properties/etc.
This kind of construct would make your application very non-portable; i.e. it won't run on Hadoop in Windows or Amazon.

故事↓在人 2024-11-17 10:15:24

Wordcount 对你来说是错误的例子。您只想将所有信息合并在一起。这与字数统计相反。

基本上,您只是将您的slaveNode_id 作为IntWritable 发送(如果可能的话),并将信息作为Text 发送。

  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,IntWritable, Text> {
    private Text word = new Text();

  public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
      word.set(tokenizer.nextToken());
      // you have to split your data here: ID and value
      IntWritable id = new IntWritable(YOUR_ID_HERE);

      output.collect(id, word);
    }
  }
}

减速器也会以同样的方式运行:

 public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text,IntWritable, Text> {
  public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable,Text> output, Reporter reporter) throws IOException {

      // now you have all the values for a slaveID as key. Do whatever you like with that...
      for(Text value : values)
         output.collect(key, value)
  }
}

Wordcount is the wrong example for you. You want to simply merge all information together. This inverses the things to wordcount.

Basically you're just emitting your slaveNode_id as a IntWritable (if this is possible) and the information as Text.

  public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,IntWritable, Text> {
    private Text word = new Text();

  public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
      word.set(tokenizer.nextToken());
      // you have to split your data here: ID and value
      IntWritable id = new IntWritable(YOUR_ID_HERE);

      output.collect(id, word);
    }
  }
}

And the reducer would go the same way:

 public static class Reduce extends MapReduceBase implements Reducer<IntWritable, Text,IntWritable, Text> {
  public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable,Text> output, Reporter reporter) throws IOException {

      // now you have all the values for a slaveID as key. Do whatever you like with that...
      for(Text value : values)
         output.collect(key, value)
  }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文