FileInputFormat,其中文件名是 KEY,文本内容是 VALUE
我想使用整个文件作为 MAP
处理的单个记录,并以文件名作为键。
我已阅读以下帖子: 运行 Hadoop MapReduce 作业时如何获取文件名/文件内容作为 MAP 的键/值输入?
虽然最佳答案的理论是可靠的,但实际上没有提供代码或“操作方法”。
这是我的自定义 FileInputFormat
和相应的 RecordReader
,它们可以编译,但不会生成任何记录数据。
感谢您的任何帮助。
public class CommentsInput
extends FileInputFormat<Text,Text> {
protected boolean isSplitable(FileSystem fs, Path filename)
{
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext ctx)
throws IOException, InterruptedException {
return new CommentFileRecordReader((FileSplit) split, ctx.getConfiguration());
}
//////////////////////////////////////////////////
public class CommentFileRecordReader
extends RecordReader<Text,Text> {
private InputStream in;
private long start;
private long length;
private long position;
private Text key;
private Text value;
private boolean processed;
private FileSplit fileSplit;
private Configuration conf;
public CommentFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException
{
this.fileSplit = fileSplit;
this.conf=conf;
}
/** Boilerplate initialization code for file input streams. */
@Override
public void initialize(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
fileSplit = (FileSplit) split;
this.start = fileSplit.getStart();
this.length = fileSplit.getLength();
this.position = 0;
this.processed = false;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream in = fs.open(path);
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec codec = codecs.getCodec(path);
if (codec != null)
this.in = codec.createInputStream(in);
else
this.in = in;
// If using Writables:
// key = new Text();
// value = new Text();
}
public boolean next(Text key, Text value) throws IOException
{
if(!processed)
{
key = new Text(fileSplit.getPath().toString());
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
byte[] contents = new byte[(int) fileSplit.getLength()];
try
{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents.toString());
}
finally
{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public boolean nextKeyValue() throws IOException {
// TODO parse the next key value, update position and return true.
return false;
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/** Returns our progress within the split, as a float between 0 and 1. */
@Override
public float getProgress() {
if (length == 0)
return 0.0f;
return Math.min(1.0f, position / (float)length);
}
@Override
public void close() throws IOException {
if (in != null)
in.close();
}
}
I'd like to use an entire file as a single record for MAP
processing, with the filename as the key.
I've read the following post: How to get Filename/File Contents as key/value input for MAP when running a Hadoop MapReduce Job?
and while the theory of the top answer is solid, no code or "how-to" is actually provided.
Here is my custom FileInputFormat
and the corresponding RecordReader
, which compile, yet do not produce ANY record data.
Thanks for any help.
public class CommentsInput
extends FileInputFormat<Text,Text> {
protected boolean isSplitable(FileSystem fs, Path filename)
{
return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext ctx)
throws IOException, InterruptedException {
return new CommentFileRecordReader((FileSplit) split, ctx.getConfiguration());
}
/////////////////////////
public class CommentFileRecordReader
extends RecordReader<Text,Text> {
private InputStream in;
private long start;
private long length;
private long position;
private Text key;
private Text value;
private boolean processed;
private FileSplit fileSplit;
private Configuration conf;
public CommentFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException
{
this.fileSplit = fileSplit;
this.conf=conf;
}
/** Boilerplate initialization code for file input streams. */
@Override
public void initialize(InputSplit split,
TaskAttemptContext context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
fileSplit = (FileSplit) split;
this.start = fileSplit.getStart();
this.length = fileSplit.getLength();
this.position = 0;
this.processed = false;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream in = fs.open(path);
CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec codec = codecs.getCodec(path);
if (codec != null)
this.in = codec.createInputStream(in);
else
this.in = in;
// If using Writables:
// key = new Text();
// value = new Text();
}
public boolean next(Text key, Text value) throws IOException
{
if(!processed)
{
key = new Text(fileSplit.getPath().toString());
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
byte[] contents = new byte[(int) fileSplit.getLength()];
try
{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents.toString());
}
finally
{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public boolean nextKeyValue() throws IOException {
// TODO parse the next key value, update position and return true.
return false;
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/** Returns our progress within the split, as a float between 0 and 1. */
@Override
public float getProgress() {
if (length == 0)
return 0.0f;
return Math.min(1.0f, position / (float)length);
}
@Override
public void close() throws IOException {
if (in != null)
in.close();
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您需要找到一种方法来定义您自己的关键类并确保您的类使用它。您可以查找如何定义自己的密钥类,并且可以通过在其路径上调用 hte
getName()
方法来获取文件名,然后使用它来制作密钥。You need to find a way to define your own key class and make sure your classes use it. You can look up how to define your own key class and you can get a file name by calling hte
getName()
method on its path then use it to make your key.