具有 HDFS 输入和 HBASE 输出的 hadoop map reduce 作业

发布于 2024-10-09 06:58:13 字数 1827 浏览 2 评论 0原文

我是hadoop新手。 我有一个 MapReduce 作业,应该从 Hdfs 获取输入并将减速器的输出写入 Hbase。我还没有找到任何好的例子。

这是代码,运行此示例的错误是映射中的类型不匹配,预期 ImmutableBytesWritable 收到 IntWritable。

Mapper 类

public static class AddValueMapper extends Mapper < LongWritable,
 Text, ImmutableBytesWritable, IntWritable > {  

  /* input <key, line number : value, full line>
   *  output <key, log key : value >*/  
public void map(LongWritable key, Text value, 
     Context context)throws IOException, 
     InterruptedException {
  byte[] key;
  int value, pos = 0;
  String line = value.toString();
  String p1 , p2 = null;
  pos = line.indexOf("=");

   //Key part
   p1 = line.substring(0, pos);
   p1 = p1.trim();
   key = Bytes.toBytes(p1);   

   //Value part
   p2 = line.substring(pos +1);
   p2 = p2.trim();
   value = Integer.parseInt(p2);

   context.write(new ImmutableBytesWritable(key),new IntWritable(value));
  }
}

Reducer 类

public static class AddValuesReducer extends TableReducer<
  ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, 
   Context context) throws IOException, InterruptedException {

         long total =0;
         // Loop values
         while(values.iterator().hasNext()){
           total += values.iterator().next().get();
         }
         // Put to HBase
         Put put = new Put(key.get());
         put.add(Bytes.toBytes("data"), Bytes.toBytes("total"),
           Bytes.toBytes(total));
         Bytes.toInt(key.get()), total));
            context.write(key, put);
        }
    }

我有一个类似的工作,仅使用 HDFS 并且工作正常。

2013 年 6 月 18 日编辑。大学项目两年前顺利完成。对于作业配置(驱动程序部分),检查正确答案。

I'm new on hadoop.
I have a MapReduce job which is supposed to get an input from Hdfs and write the output of the reducer to Hbase. I haven't found any good example.

Here's the code, the error runing this example is Type mismatch in map, expected ImmutableBytesWritable recieved IntWritable.

Mapper Class

public static class AddValueMapper extends Mapper < LongWritable,
 Text, ImmutableBytesWritable, IntWritable > {  

  /* input <key, line number : value, full line>
   *  output <key, log key : value >*/  
public void map(LongWritable key, Text value, 
     Context context)throws IOException, 
     InterruptedException {
  byte[] key;
  int value, pos = 0;
  String line = value.toString();
  String p1 , p2 = null;
  pos = line.indexOf("=");

   //Key part
   p1 = line.substring(0, pos);
   p1 = p1.trim();
   key = Bytes.toBytes(p1);   

   //Value part
   p2 = line.substring(pos +1);
   p2 = p2.trim();
   value = Integer.parseInt(p2);

   context.write(new ImmutableBytesWritable(key),new IntWritable(value));
  }
}

Reducer Class

public static class AddValuesReducer extends TableReducer<
  ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {

  public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values, 
   Context context) throws IOException, InterruptedException {

         long total =0;
         // Loop values
         while(values.iterator().hasNext()){
           total += values.iterator().next().get();
         }
         // Put to HBase
         Put put = new Put(key.get());
         put.add(Bytes.toBytes("data"), Bytes.toBytes("total"),
           Bytes.toBytes(total));
         Bytes.toInt(key.get()), total));
            context.write(key, put);
        }
    }

I had a similar job only with HDFS and works fine.

Edited 18-06-2013. The college project finished successfully two years ago. For job configuration (driver part) check correct answer.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

属性 2024-10-16 06:58:13

这是解决您问题的代码



Driver

HBaseConfiguration conf =  HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
    job.setJarByClass(yourclass.class);
    job.setMapperClass(yourMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Intwritable.class);
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    TableMapReduceUtil.initTableReducerJob(TABLE,
            yourReducer.class, job);
    job.setReducerClass(yourReducer.class);
            job.waitForCompletion(true);


Mapper&Reducer

class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
 }

class yourReducer
        extends
        TableReducer<Text, IntWritable, 
        ImmutableBytesWritable>
{
//@override reduce()
}

Here is the code which will solve your problem



Driver

HBaseConfiguration conf =  HBaseConfiguration.create();
Job job = new Job(conf,"JOB_NAME");
    job.setJarByClass(yourclass.class);
    job.setMapperClass(yourMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Intwritable.class);
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    TableMapReduceUtil.initTableReducerJob(TABLE,
            yourReducer.class, job);
    job.setReducerClass(yourReducer.class);
            job.waitForCompletion(true);


Mapper&Reducer

class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
//@overide map()
 }

class yourReducer
        extends
        TableReducer<Text, IntWritable, 
        ImmutableBytesWritable>
{
//@override reduce()
}

凉墨 2024-10-16 06:58:13

不知道为什么 HDFS 版本可以工作:通常你必须设置作业的输入格式,而 FileInputFormat 是一个抽象类。也许你遗漏了一些台词?例如

job.setInputFormatClass(TextInputFormat.class);

Not sure why the HDFS version works: normaly you have to set the input format for the job, and FileInputFormat is an abstract class. Perhaps you left some lines out? such as

job.setInputFormatClass(TextInputFormat.class);
℉絮湮 2024-10-16 06:58:13

在 HBase 中批量加载数据的最佳且最快方法是使用 HFileOutputFormatCompliteBulkLoad 实用程序。

您将找到示例代码 这里

希望这会有用:)

The best and fastest way to BulkLoad data in HBase is use of HFileOutputFormat and CompliteBulkLoad utility.

You will find a sample code here:

Hope this will be useful :)

往事风中埋 2024-10-16 06:58:13
 public void map(LongWritable key, Text value, 
 Context context)throws IOException, 
 InterruptedException {

将其更改为 immutableBytesWritableintwritable

我不确定..希望它有效

 public void map(LongWritable key, Text value, 
 Context context)throws IOException, 
 InterruptedException {

change this to immutableBytesWritable, intwritable.

I am not sure..hope it works

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文