用hadoop计算两个文件的记录的集交集和集差
很抱歉将其交叉发布到 hadoop 用户邮件列表和此处,但这对我来说是一个紧急问题。
我的问题如下: 我有两个输入文件,我想确定
- a) 仅在文件 1 中出现的行数
- b) 仅在文件 2 中出现的行数
- c) 两者共有的行数(例如,关于字符串)平等)
示例:
File 1:
a
b
c
File 2:
a
d
每种情况所需的输出:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
基本上我的方法如下: 我编写了自己的 LineRecordReader,以便映射器接收由行(文本)和指示源文件的字节(0 或 1)组成的对。 映射器仅再次返回该对,因此实际上它什么也不做。 然而,副作用是,组合器收到一个
Map<Line, Iterable<SourceId>>
(其中 SourceId 为 0 或 1)。
现在,对于每一行,我都可以获得它出现的源集。因此,我可以编写一个组合器,计算每种情况(a、b、c)的行数(清单 1),
然后组合器输出一个“摘要” ' 仅在清理时(安全吗?)。 所以这个摘要看起来像:
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
在减速器中,我只对这些摘要的值进行求和。 (因此,减速器的输出看起来与组合器的输出一样)。
但是,主要问题是,我需要将两个源文件视为单个虚拟文件,该虚拟文件会产生以下形式的记录 (line, sourceId) // sourceId 0 或 1
我不知道如何实现这一点。 所以问题是我是否可以避免事先预处理和合并文件,并使用虚拟合并文件读取器和自定义记录读取器之类的东西即时执行此操作。 非常感谢任何代码示例。
此致, 克劳斯
清单 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}
Sorry for cross-posting this on the hadoop user mailing list and here, but this is getting an urgent matter for me.
My problem is as follows:
I have two input files, and I want to determine
- a) The number of lines which only occur in file 1
- b) The number of lines which only occur in file 2
- c) The number of lines common to both (e.g. in regard to string equality)
Example:
File 1:
a
b
c
File 2:
a
d
Desired output for each case:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
Basically my approach is as follows:
I wrote my own LineRecordReader, so that the mapper receives a pair consisting of the line (text) and a byte indicating the source file (either 0 or 1).
The mapper only returns the pair again so actually it does nothing.
However, the side effect is, that the combiner receives a
Map<Line, Iterable<SourceId>>
(where SourceId is either 0 or 1).
Now, for each line I can get the set of sources it appears in. Therefore, I could write a combiner that counts for each case (a, b, c) the number of lines (Listing 1)
The combiner then outputs a 'summary' only on cleanup (is that safe?).
So this summary looks like:
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
In the reducer I then only sum up the values for these summaries. (So the output of the reducer looks just as that of the combiner).
However, the main problem is, that I need to treat both source files as a single virtual file which yield records of the form
(line, sourceId) // sourceId either 0 or 1
And I am not sure how to achieve that.
So the question is whether I can avoid preprocessing and merging the files beforehand, and do that on-the-fly with a something like a virtually-merged-file-reader and custom record reader.
Any code example is much appreciated.
Best regards,
Claus
Listing 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
好吧,我必须承认,到目前为止,我并没有真正理解您所尝试的要点,但我有一个简单的方法来完成您可能需要的事情。
看一下文件映射器。这个将获取文件名并将其与输入的每一行一起提交。
现在我们有一堆像这样的键/值(就您的示例而言)
显然减少它们将为您提供如下输入:
您需要在减速器中执行的操作可能如下所示:
您必须替换字符串在 if 语句中添加到您的文件名。
我认为使用作业计数器比使用自己的原语并将它们写入清理中的上下文更清晰一些。您可以通过在完成后调用此内容来检索作业的计数器:
尽管如此,如果您需要 HDFS 中的公共行数等,那么请寻找您的解决方案。
希望对您有帮助。
Okay, I must admit that I didn't really catch the gist of what you've tried so far, but I have a simple approach to do the stuff you may need.
Have a look at the filemapper. This one is going to get the filename and submit it with each line of the input.
Now we have a bunch of key / values that look like this (in regard to your example)
Obviously reducing them will get you an input like this:
What you need to do in your reducer could look like this:
You have to replace the string inside the if statement to your filenames.
I think that using the job counter is a bit clearer than using own primitives and writing them to context in cleanup. You can retrieve the counters for a job by calling this stuff after the completion:
Never the less, if you need the numbers of the lines in common etc in your HDFS, then go for your solution.
Hope that helped you.