使用 MultipleOutputs 写入 MapReduce 中的 HBase

发布于 2024-11-07 12:54:10 字数 1152 浏览 0 评论 0原文

我目前有一个 MapReduce 作业,它使用 MultipleOutputs 将数据发送到多个 HDFS 位置。完成后,我使用 HBase 客户端调用(在 MR 之外)将一些相同的元素添加到一些 HBase 表中。使用 TableOutputFormat 将 HBase 输出添加为附加的 MultipleOutputs 会很好。通过这种方式,我可以分发我的 HBase 处理。

问题是,我无法让它发挥作用。有没有人在 MultipleOutputs 中使用过 TableOutputFormat...?具有多个 HBase 输出?

基本上,我正在设置我的收集器,就像这样......

Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector1 = multipleOutputs.getCollector("hbase1", reporter); 
Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector2 = multipleOutputs.getCollector("hbase2", reporter); 
Put put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata1);
hbaseCollector1.collect(NullWritable.get(), put);

put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata2);
hbaseCollector2.collect(newImmutableBytesWritable(mykey.getBytes()), put);

我认为这似乎遵循 hbase 编写的总体思路。

当我输入此内容时,部分问题可能更多地在于工作定义中。看起来 MR(和 Hbase)想要一个全局参数集,像这样......

conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");

来提供表名称。问题是,我有两张桌子......

有什么想法吗?

谢谢

I currently have a MapReduce job that uses MultipleOutputs to send data to several HDFS locations. After that completes, I am using HBase client calls (outside of MR) to add some of the same elements to a few HBase tables. It would be nice to add the HBase outputs as just additional MultipleOutputs, using TableOutputFormat. In that way, I would distribute my HBase processing.

Problem is, I cannot get this to work. Has anyone ever used TableOutputFormat in MultipleOutputs...? With multiple HBase outputs?

basically, I am setting up my collectors, like this....

Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector1 = multipleOutputs.getCollector("hbase1", reporter); 
Outputcollector<ImmutableBytesWritable, Writable> hbaseCollector2 = multipleOutputs.getCollector("hbase2", reporter); 
Put put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata1);
hbaseCollector1.collect(NullWritable.get(), put);

put = new Put(mykey.getBytes());
put.add("family".getBytes(), "column".getBytes(), somedata2);
hbaseCollector2.collect(newImmutableBytesWritable(mykey.getBytes()), put);

This seems to follow the general idea of hbase writing, I think.

Part of the issue, as I type this, might be more in the job definition. Looks like MR (and Hbase) want a global parameter set, like this....

conf.set(TableOutputFormat.OUTPUT_TABLE, "articles");

to provide the table name. Trouble is, I have two tables....

Any ideas?

Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

岁月蹉跎了容颜 2024-11-14 12:54:10

我已经用 3 种不同的方式将数据放入 HBase 中。最有效(且分布式)的是使用 HFileOutputFormat 类。

我按如下方式设置作业...(请注意,这是根据实际代码编辑的,但核心内容仍然存在)

cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);      
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);       
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);

正如我们所看到的,我的 Mapper 类名为 HiveToHBaseMapper - 漂亮且原创。 :) 这是它的(再次粗略的)定义,

public class HiveToHBaseMapper extends
    Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
    throws IOException, InterruptedException {
    Configuration config = context.getConfiguration();
    String family = config.get("FAMILY");
    Double value = Double.parseDouble(sValue);
    String sKey = generateKey(config);
    byte[] bKey = Bytes.toBytes(sKey);
    Put put = new Put(bKey);
    put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
        ? Bytes.toBytes(Double.MIN_VALUE)
        : Bytes.toBytes(value));        
    ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
    context.write(ibKey, put);
}

我不知道您是否可以使用它来将其放入 MultipleOutputs 中,或者需要创建一个新的 MR 作业。这是我遇到的将数据导入 HBase 的最佳方法。 :)

这有望帮助您找到解决方案的正确方向。

I've put data into HBase 3 different ways. The most efficient (and distributed) was using the HFileOutputFormat class.

I set up the job as follows... (Please note this is edited from actual code, but core content remains)

cubeBuilderETLJob.setJobName(jobName);
cubeBuilderETLJob.setMapOutputKeyClass(ImmutableBytesWritable.class);
cubeBuilderETLJob.setMapOutputValueClass(Put.class);
cubeBuilderETLJob.setMapperClass(HiveToHBaseMapper.class);      
cubeBuilderETLJob.setJarByClass(CubeBuilderDriver.class);       
cubeBuilderETLJob.setInputFormatClass(TextInputFormat.class);
cubeBuilderETLJob.setOutputFormatClass(HFileOutputFormat.class);
HFileOutputFormat.setOutputPath(cubeBuilderETLJob, cubeOutputPath);
HTable hTable = null;
Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("ZOOKEEPER_QUORUM", hbaseZookeeperQuorum);
hConf.set("ZOOKEEPER_CLIENTPORT", hbaseZookeeperClientPort);
hTable = new HTable(hConf, tableName);
HFileOutputFormat.configureIncrementalLoad(cubeBuilderETLJob, hTable);

As we can see, my Mapper class is called HiveToHBaseMapper - Nice and original. :) Here's the (again, rough) definition of it

public class HiveToHBaseMapper extends
    Mapper<WritableComparable, Writable, ImmutableBytesWritable, Put> {
@Override
public void map(WritableComparable key, Writable val, Context context)
    throws IOException, InterruptedException {
    Configuration config = context.getConfiguration();
    String family = config.get("FAMILY");
    Double value = Double.parseDouble(sValue);
    String sKey = generateKey(config);
    byte[] bKey = Bytes.toBytes(sKey);
    Put put = new Put(bKey);
    put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) 
        ? Bytes.toBytes(Double.MIN_VALUE)
        : Bytes.toBytes(value));        
    ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
    context.write(ibKey, put);
}

I don't know if you can use this to fit it into the MultipleOutputs or need to create a new MR job. This is the best way I've come across to get data into HBase. :)

This will hopefully get you in the right direction to finding a solution.

神妖 2024-11-14 12:54:10

根据我的经验,最好的方法是在数据可用时立即将数据放入 hbase 表中(除非您批量加载数据)。如果您的地图任务中有可用数据,那么这是将其推送到 hbase 的最佳时机。如果在执行reduce 任务之前没有数据,则将推送添加到那里的hbase。在您知道 HBase 是您的瓶颈之前,让 HBase 担心缓存问题。

In my experience, the best approach is to just put the data into the hbase table as soon as you have it available (unless you are bulk loading data). If you have the data available in your map task, that is the best time to push it to hbase. If you don't have the data until a reduce task, then add the push to hbase there. Until you know that HBase is your bottleneck, let HBase worry about the caching issues.

阪姬 2024-11-14 12:54:10

因此,显然,旧的 mapred 软件包不可能做到这一点。 mapreduce 包集中有一个新的 OutputFormat,但我现在不想转换为该格式。因此,我将不得不编写多个 MR 作业。

So, apparently, this not possible with the old mapred packages. There is a new OutputFormat in the mapreduce package set, but I don't want to convert to that right now. So, I will have to write multiple MR jobs.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文