用hadoop计算两个文件的记录的集交集和集差

发布于 2024-11-17 04:20:23 字数 2217 浏览 4 评论 0原文

很抱歉将其交叉发布到 hadoop 用户邮件列表和此处,但这对我来说是一个紧急问题。

我的问题如下: 我有两个输入文件,我想确定

  • a) 仅在文件 1 中出现的行数
  • b) 仅在文件 2 中出现的行数
  • c) 两者共有的行数(例如,关于字符串)平等)

示例:

File 1:
a
b
c

File 2:
a
d

每种情况所需的输出:

lines_only_in_1: 2         (b, c)
lines_only_in_2: 1         (d)
lines_in_both:   1         (a)

基本上我的方法如下: 我编写了自己的 LineRecordReader,以便映射器接收由行(文本)和指示源文件的字节(0 或 1)组成的对。 映射器仅再次返回该对,因此实际上它什么也不做。 然而,副作用是,组合器收到一个

Map<Line, Iterable<SourceId>>

(其中 SourceId 为 0 或 1)。

现在,对于每一行,我都可以获得它出现的源集。因此,我可以编写一个组合器,计算每种情况(a、b、c)的行数(清单 1),

然后组合器输出一个“摘要” ' 仅在清理时(安全吗?)。 所以这个摘要看起来像:

lines_only_in_1   2531
lines_only_in_2   3190
lines_in_both      901

在减速器中,我只对这些摘要的值进行求和。 (因此,减速器的输出看起来与组合器的输出一样)。

但是,主要问题是,我需要将两个源文件视为单个虚拟文件,该虚拟文件会产生以下形式的记录 (line, sourceId) // sourceId 0 或 1

我不知道如何实现这一点。 所以问题是我是否可以避免事先预处理和合并文件,并使用虚拟合并文件读取器和自定义记录读取器之类的东西即时执行此操作。 非常感谢任何代码示例。

此致, 克劳斯

清单 1:

public static class SourceCombiner
    extends Reducer<Text, ByteWritable, Text, LongWritable> {

    private long countA = 0;
    private long countB = 0;
    private long countC = 0; // C = lines (c)ommon to both sources

    @Override
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
        Set<Byte> fileIds = new HashSet<Byte>();
        for (ByteWritable val : values) {
            byte fileId = val.get();

            fileIds.add(fileId);
        }

        if(fileIds.contains((byte)0)) { ++countA; }
        if(fileIds.contains((byte)1)) { ++countB; }
        if(fileIds.size() >= 2) { ++countC; }
    }

    protected void cleanup(Context context)
            throws java.io.IOException, java.lang.InterruptedException
    {
        context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
        context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
        context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
    }
}

Sorry for cross-posting this on the hadoop user mailing list and here, but this is getting an urgent matter for me.

My problem is as follows:
I have two input files, and I want to determine

  • a) The number of lines which only occur in file 1
  • b) The number of lines which only occur in file 2
  • c) The number of lines common to both (e.g. in regard to string equality)

Example:

File 1:
a
b
c

File 2:
a
d

Desired output for each case:

lines_only_in_1: 2         (b, c)
lines_only_in_2: 1         (d)
lines_in_both:   1         (a)

Basically my approach is as follows:
I wrote my own LineRecordReader, so that the mapper receives a pair consisting of the line (text) and a byte indicating the source file (either 0 or 1).
The mapper only returns the pair again so actually it does nothing.
However, the side effect is, that the combiner receives a

Map<Line, Iterable<SourceId>>

(where SourceId is either 0 or 1).

Now, for each line I can get the set of sources it appears in. Therefore, I could write a combiner that counts for each case (a, b, c) the number of lines (Listing 1)

The combiner then outputs a 'summary' only on cleanup (is that safe?).
So this summary looks like:

lines_only_in_1   2531
lines_only_in_2   3190
lines_in_both      901

In the reducer I then only sum up the values for these summaries. (So the output of the reducer looks just as that of the combiner).

However, the main problem is, that I need to treat both source files as a single virtual file which yield records of the form
(line, sourceId) // sourceId either 0 or 1

And I am not sure how to achieve that.
So the question is whether I can avoid preprocessing and merging the files beforehand, and do that on-the-fly with a something like a virtually-merged-file-reader and custom record reader.
Any code example is much appreciated.

Best regards,
Claus

Listing 1:

public static class SourceCombiner
    extends Reducer<Text, ByteWritable, Text, LongWritable> {

    private long countA = 0;
    private long countB = 0;
    private long countC = 0; // C = lines (c)ommon to both sources

    @Override
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
        Set<Byte> fileIds = new HashSet<Byte>();
        for (ByteWritable val : values) {
            byte fileId = val.get();

            fileIds.add(fileId);
        }

        if(fileIds.contains((byte)0)) { ++countA; }
        if(fileIds.contains((byte)1)) { ++countB; }
        if(fileIds.size() >= 2) { ++countC; }
    }

    protected void cleanup(Context context)
            throws java.io.IOException, java.lang.InterruptedException
    {
        context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
        context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
        context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

岁吢 2024-11-24 04:20:23

好吧,我必须承认,到目前为止,我并没有真正理解您所尝试的要点,但我有一个简单的方法来完成您可能需要的事情。

看一下文件映射器。这个将获取文件名并将其与输入的每一行一起提交。

    public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {

        static Text fileName;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, fileName);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {

            String name = ((FileSplit) context.getInputSplit()).getPath().getName();
            fileName = new Text(name);
        }
    }

现在我们有一堆像这样的键/值(就您的示例而言)

    a File 1
    b File 1
    c File 1

    a File 2
    d File 2

显然减少它们将为您提供如下输入:

    a File 1,File 2
    b File 1
    c File 1
    d File 2

您需要在减速器中执行的操作可能如下所示:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    enum Counter {
        LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        HashSet<String> set = new HashSet<String>();
        for (Text t : values) {
            set.add(t.toString());
        }

        // if we have only two files and we have just two records in our hashset
        // the line is contained in both files
        if (set.size() == 2) {
            context.getCounter(Counter.LINES_IN_COMMON).increment(1);
        } else {
            // sorry this is a bit dirty...
            String t = set.iterator().next();
            // determine which file it was by checking for the name:
            if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
                context.getCounter(Counter.LINES_IN_FIRST).increment(1);
            } else {
                context.getCounter(Counter.LINES_IN_SECOND).increment(1);
            }
        }
    }

}

您必须替换字符串在 if 语句中添加到您的文件名。

我认为使用作业计数器比使用自己的原语并将它们写入清理中的上下文更清晰一些。您可以通过在完成后调用此内容来检索作业的计数器:

Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();

尽管如此,如果您需要 HDFS 中的公共行数等,那么请寻找您的解决方案。

希望对您有帮助。

Okay, I must admit that I didn't really catch the gist of what you've tried so far, but I have a simple approach to do the stuff you may need.

Have a look at the filemapper. This one is going to get the filename and submit it with each line of the input.

    public class FileMapper extends Mapper<LongWritable, Text, Text, Text> {

        static Text fileName;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(value, fileName);
        }

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {

            String name = ((FileSplit) context.getInputSplit()).getPath().getName();
            fileName = new Text(name);
        }
    }

Now we have a bunch of key / values that look like this (in regard to your example)

    a File 1
    b File 1
    c File 1

    a File 2
    d File 2

Obviously reducing them will get you an input like this:

    a File 1,File 2
    b File 1
    c File 1
    d File 2

What you need to do in your reducer could look like this:

public class FileReducer extends Reducer<Text, Text, Text, Text> {

    enum Counter {
        LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND
    }

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        HashSet<String> set = new HashSet<String>();
        for (Text t : values) {
            set.add(t.toString());
        }

        // if we have only two files and we have just two records in our hashset
        // the line is contained in both files
        if (set.size() == 2) {
            context.getCounter(Counter.LINES_IN_COMMON).increment(1);
        } else {
            // sorry this is a bit dirty...
            String t = set.iterator().next();
            // determine which file it was by checking for the name:
            if(t.toString().equals("YOUR_FIRST_FILE_NAME")){
                context.getCounter(Counter.LINES_IN_FIRST).increment(1);
            } else {
                context.getCounter(Counter.LINES_IN_SECOND).increment(1);
            }
        }
    }

}

You have to replace the string inside the if statement to your filenames.

I think that using the job counter is a bit clearer than using own primitives and writing them to context in cleanup. You can retrieve the counters for a job by calling this stuff after the completion:

Job job = new Job(new Configuration());
//setup stuff etc omitted..
job.waitForCompletion(true);
// do the same line with the other enums
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue();

Never the less, if you need the numbers of the lines in common etc in your HDFS, then go for your solution.

Hope that helped you.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文