具有 HDFS 输入和 HBASE 输出的 hadoop map reduce 作业
我是hadoop新手。 我有一个 MapReduce 作业,应该从 Hdfs 获取输入并将减速器的输出写入 Hbase。我还没有找到任何好的例子。
这是代码,运行此示例的错误是映射中的类型不匹配,预期 ImmutableBytesWritable 收到 IntWritable。
Mapper 类
public static class AddValueMapper extends Mapper < LongWritable,
Text, ImmutableBytesWritable, IntWritable > {
/* input <key, line number : value, full line>
* output <key, log key : value >*/
public void map(LongWritable key, Text value,
Context context)throws IOException,
InterruptedException {
byte[] key;
int value, pos = 0;
String line = value.toString();
String p1 , p2 = null;
pos = line.indexOf("=");
//Key part
p1 = line.substring(0, pos);
p1 = p1.trim();
key = Bytes.toBytes(p1);
//Value part
p2 = line.substring(pos +1);
p2 = p2.trim();
value = Integer.parseInt(p2);
context.write(new ImmutableBytesWritable(key),new IntWritable(value));
}
}
Reducer 类
public static class AddValuesReducer extends TableReducer<
ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
long total =0;
// Loop values
while(values.iterator().hasNext()){
total += values.iterator().next().get();
}
// Put to HBase
Put put = new Put(key.get());
put.add(Bytes.toBytes("data"), Bytes.toBytes("total"),
Bytes.toBytes(total));
Bytes.toInt(key.get()), total));
context.write(key, put);
}
}
我有一个类似的工作,仅使用 HDFS 并且工作正常。
2013 年 6 月 18 日编辑。大学项目两年前顺利完成。对于作业配置(驱动程序部分),检查正确答案。
I'm new on hadoop.
I have a MapReduce job which is supposed to get an input from Hdfs and write the output of the reducer to Hbase. I haven't found any good example.
Here's the code, the error runing this example is Type mismatch in map, expected ImmutableBytesWritable recieved IntWritable.
Mapper Class
public static class AddValueMapper extends Mapper < LongWritable,
Text, ImmutableBytesWritable, IntWritable > {
/* input <key, line number : value, full line>
* output <key, log key : value >*/
public void map(LongWritable key, Text value,
Context context)throws IOException,
InterruptedException {
byte[] key;
int value, pos = 0;
String line = value.toString();
String p1 , p2 = null;
pos = line.indexOf("=");
//Key part
p1 = line.substring(0, pos);
p1 = p1.trim();
key = Bytes.toBytes(p1);
//Value part
p2 = line.substring(pos +1);
p2 = p2.trim();
value = Integer.parseInt(p2);
context.write(new ImmutableBytesWritable(key),new IntWritable(value));
}
}
Reducer Class
public static class AddValuesReducer extends TableReducer<
ImmutableBytesWritable, IntWritable, ImmutableBytesWritable> {
public void reduce(ImmutableBytesWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
long total =0;
// Loop values
while(values.iterator().hasNext()){
total += values.iterator().next().get();
}
// Put to HBase
Put put = new Put(key.get());
put.add(Bytes.toBytes("data"), Bytes.toBytes("total"),
Bytes.toBytes(total));
Bytes.toInt(key.get()), total));
context.write(key, put);
}
}
I had a similar job only with HDFS and works fine.
Edited 18-06-2013. The college project finished successfully two years ago. For job configuration (driver part) check correct answer.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
这是解决您问题的代码
Driver
Mapper&Reducer
Here is the code which will solve your problem
Driver
Mapper&Reducer
不知道为什么 HDFS 版本可以工作:通常你必须设置作业的输入格式,而 FileInputFormat 是一个抽象类。也许你遗漏了一些台词?例如
Not sure why the HDFS version works: normaly you have to set the input format for the job, and FileInputFormat is an abstract class. Perhaps you left some lines out? such as
在 HBase 中批量加载数据的最佳且最快方法是使用
HFileOutputFormat
和CompliteBulkLoad
实用程序。您将找到示例代码 这里:
希望这会有用:)
The best and fastest way to BulkLoad data in HBase is use of
HFileOutputFormat
andCompliteBulkLoad
utility.You will find a sample code here:
Hope this will be useful :)
将其更改为
immutableBytesWritable
、intwritable
。我不确定..希望它有效
change this to
immutableBytesWritable
,intwritable
.I am not sure..hope it works