Hadoop/MapReduce:读取和写入从 DDL 生成的类

发布于 2024-09-01 09:53:29 字数 521 浏览 6 评论 0原文

有人可以引导我完成使用 DDL 生成的类读取和写入数据的基本工作流程吗?

我使用 DDL 定义了一些类似结构的记录。例如:

  class Customer {
     ustring FirstName;
     ustring LastName;
     ustring CardNo;
     long LastPurchase;
  }

我已经编译它以获得一个 Customer 类并将其包含到我的项目中。我可以很容易地看到如何使用它作为映射器和化简器的输入和输出(生成的类实现 Writable),但不知道如何将其读取和写入文件。

org.apache.hadoop.record 包的 JavaDoc 讨论了以二进制、CSV 或 XML 格式序列化这些记录。我实际上该如何做到这一点?假设我的减速器生成 IntWritable 键和 Customer 值。我应该使用什么 OutputFormat 将结果写入 CSV 格式?如果我想对结果文件进行分析,我将使用什么输入格式来读取结果文件?

Can someone walk me though the basic work-flow of reading and writing data with classes generated from DDL?

I have defined some struct-like records using DDL. For example:

  class Customer {
     ustring FirstName;
     ustring LastName;
     ustring CardNo;
     long LastPurchase;
  }

I've compiled this to get a Customer class and included it into my project. I can easily see how to use this as input and output for mappers and reducers (the generated class implements Writable), but not how to read and write it to file.

The JavaDoc for the org.apache.hadoop.record package talks about serializing these records in Binary, CSV or XML format. How do I actually do that? Say my reducer produces IntWritable keys and Customer values. What OutputFormat do I use to write the result in CSV format? What InputFormat would I use to read the resulting files in later, if I wanted to perform analysis over them?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

明月夜 2024-09-08 09:53:29

好的,所以我想我已经弄清楚了。我不确定这是否是最直接的方法,所以如果您知道更简单的工作流程,请纠正我。

从 DDL 生成的每个类都实现 Record 接口,因此提供了两个方法:

serialize(RecordOutput out) 用于写入
deserialize(RecordInput in)用于读取

RecordOutputRecordInputorg.apache.hadoop.record中提供的实用程序接口 > 包。有一些实现(例如 XMLRecordOutputBinaryRecordOutputCSVRecordOutput

据我所知,您必须实现自己的 OutputFormat InputFormat 类来使用它们。这很容易做到。

例如,我在最初的问题中谈到的 OutputFormat(以 CSV 格式写入 Integer 键和 Customer 值)将按如下方式实现:


  private static class CustomerOutputFormat 
    extends TextOutputFormat<IntWritable, Customer> 
  {

    public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
      JobConf job,
      String name,
      Progressable progress)
    throws IOException {
      Path file = FileOutputFormat.getTaskOutputPath(job, name);
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new CustomerRecordWriter(fileOut);
    }   

    protected static class CustomerRecordWriter 
      implements RecordWriter<IntWritable, Customer> 
    {

      protected DataOutputStream outStream ;

      public AnchorRecordWriter(DataOutputStream out) {
        this.outStream = out ; 
      }

      public synchronized void write(IntWritable key, Customer value) throws IOException {

        CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
        csvOutput.writeInteger(key.get(), "id") ;
        value.serialize(csvOutput) ; 
      }

      public synchronized void close(Reporter reporter) throws IOException {
        outStream.close();
      }
    }
  }

创建 InputFormat 大致相同。因为 csv 格式是每行一个条目,所以我们可以在内部使用 LineRecordReader 来完成大部分工作。



private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {

  public RecordReader<IntWritable, Customer> getRecordReader(
    InputSplit genericSplit, 
    JobConf job,
    Reporter reporter)
  throws IOException {

    reporter.setStatus(genericSplit.toString());
    return new CustomerRecordReader(job, (FileSplit) genericSplit);
  }

  private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {

    private LineRecordReader lrr ;

    public CustomerRecordReader(Configuration job, FileSplit split) 
    throws IOException{
      this.lrr = new LineRecordReader(job, split);    
    }

    public IntWritable createKey() {
      return new IntWritable();
    }

    public Customer createValue() {
      return new Customer();
    }

    public synchronized boolean next(IntWritable key, Customer value)
    throws IOException {

      LongWritable offset = new LongWritable() ;
      Text line = new Text() ;

      if (!lrr.next(offset, line))
        return false ;

      CsvRecordInput cri = new CsvRecordInput(new      
        ByteArrayInputStream(line.toString().getBytes())) ;
      key.set(cri.readInt("id")) ;
      value.deserialize(cri) ;

      return true ;
    }

    public float getProgress() {
      return lrr.getProgress() ;
    }

    public synchronized long getPos() throws IOException {
      return lrr.getPos() ;
    }

    public synchronized void close() throws IOException {
      lrr.close();
    }
  }
}

Ok, so I think I have this figured out. I'm not sure if it is the most straight-forward way, so please correct me if you know a simpler work-flow.

Every class generated from DDL implements the Record interface, and consequently provides two methods:

serialize(RecordOutput out) for writing
deserialize(RecordInput in) for reading

RecordOutput and RecordInput are utility interfaces provided in the org.apache.hadoop.record package. There are a few implementations (e.g. XMLRecordOutput, BinaryRecordOutput, CSVRecordOutput)

As far as I know, you have to implement your own OutputFormat or InputFormat classes to use these. This is fairly easy to do.

For example, the OutputFormat I talked about in the original question (one that writes Integer keys and Customer values in CSV format) would be implemented like this:


  private static class CustomerOutputFormat 
    extends TextOutputFormat<IntWritable, Customer> 
  {

    public RecordWriter<IntWritable, Customer> getRecordWriter(FileSystem ignored,
      JobConf job,
      String name,
      Progressable progress)
    throws IOException {
      Path file = FileOutputFormat.getTaskOutputPath(job, name);
      FileSystem fs = file.getFileSystem(job);
      FSDataOutputStream fileOut = fs.create(file, progress);
      return new CustomerRecordWriter(fileOut);
    }   

    protected static class CustomerRecordWriter 
      implements RecordWriter<IntWritable, Customer> 
    {

      protected DataOutputStream outStream ;

      public AnchorRecordWriter(DataOutputStream out) {
        this.outStream = out ; 
      }

      public synchronized void write(IntWritable key, Customer value) throws IOException {

        CsvRecordOutput csvOutput = new CsvRecordOutput(outStream);
        csvOutput.writeInteger(key.get(), "id") ;
        value.serialize(csvOutput) ; 
      }

      public synchronized void close(Reporter reporter) throws IOException {
        outStream.close();
      }
    }
  }

Creating the InputFormat is much the same. Because the csv format is one entry per line, we can use a LineRecordReader internally to do most of the work.



private static class CustomerInputFormat extends FileInputFormat<IntWritable, Customer> {

  public RecordReader<IntWritable, Customer> getRecordReader(
    InputSplit genericSplit, 
    JobConf job,
    Reporter reporter)
  throws IOException {

    reporter.setStatus(genericSplit.toString());
    return new CustomerRecordReader(job, (FileSplit) genericSplit);
  }

  private class CustomerRecordReader implements RecordReader<IntWritable, Customer> {

    private LineRecordReader lrr ;

    public CustomerRecordReader(Configuration job, FileSplit split) 
    throws IOException{
      this.lrr = new LineRecordReader(job, split);    
    }

    public IntWritable createKey() {
      return new IntWritable();
    }

    public Customer createValue() {
      return new Customer();
    }

    public synchronized boolean next(IntWritable key, Customer value)
    throws IOException {

      LongWritable offset = new LongWritable() ;
      Text line = new Text() ;

      if (!lrr.next(offset, line))
        return false ;

      CsvRecordInput cri = new CsvRecordInput(new      
        ByteArrayInputStream(line.toString().getBytes())) ;
      key.set(cri.readInt("id")) ;
      value.deserialize(cri) ;

      return true ;
    }

    public float getProgress() {
      return lrr.getProgress() ;
    }

    public synchronized long getPos() throws IOException {
      return lrr.getPos() ;
    }

    public synchronized void close() throws IOException {
      lrr.close();
    }
  }
}

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文