在 Hadoop 上使用 DistributedCache 时找不到文件
我使用下面的代码遇到了 fileNotFound 问题,这是对distributedCache的简单测试,我不知道问题是什么?
文件的路径是正确的,但我在 datanode 上找不到文件:
package mapJoinTest2;
/*
* this is for map join using DistributedCache
* using class Path to get cache file in datanode
*
* 2012.1.13
* */
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class wordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private Text mapKey = new Text();
private Path [] localFile= new Path[1];
private FileSystem fs;
public void configure(JobConf job){
try {
fs = FileSystem.getLocal(new Configuration());
localFile = DistributedCache.getLocalCacheFiles(job);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
for(Path f:localFile){
System.out.println(f.toString());
}
mapKey.set("success");
output.collect(mapKey, value);
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(wordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
String path ="hdfs://namenode:9000/hadoop/test1"; // this file has already put on hdfs.
Path filePath = new Path(path);
String uriWithLink = filePath.toUri().toString();
DistributedCache.addCacheFile(new URI(uriWithLink), conf);
JobClient.runJob(conf);
}
}
此时我得到一个 NullPointerException
:
for(Path f:localFile){
System.out.println(f.toString());
}
问题是因为 f 的值为 null
。
我使用了下面的代码,但它不起作用。
DistributedCache.createdSymlink(conf);
DistributedCache.addCacheFile(new Path("hdfs://namenode:9000/hadoop/test1").toUri().toString() + "#" + "test1",conf);
I have a fileNotFound problem using code below, this is a simple test of distributedCache, I don't know what is the problem?
The path of the file is right, but I can not find files on datanode:
package mapJoinTest2;
/*
* this is for map join using DistributedCache
* using class Path to get cache file in datanode
*
* 2012.1.13
* */
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class wordCount {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private Text mapKey = new Text();
private Path [] localFile= new Path[1];
private FileSystem fs;
public void configure(JobConf job){
try {
fs = FileSystem.getLocal(new Configuration());
localFile = DistributedCache.getLocalCacheFiles(job);
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
for(Path f:localFile){
System.out.println(f.toString());
}
mapKey.set("success");
output.collect(mapKey, value);
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(wordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(Map.class);
conf.setNumReduceTasks(0);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
String path ="hdfs://namenode:9000/hadoop/test1"; // this file has already put on hdfs.
Path filePath = new Path(path);
String uriWithLink = filePath.toUri().toString();
DistributedCache.addCacheFile(new URI(uriWithLink), conf);
JobClient.runJob(conf);
}
}
I get a NullPointerException
at this point:
for(Path f:localFile){
System.out.println(f.toString());
}
The problem is because value of f is null
.
I used code below, but it wound`t work.
DistributedCache.createdSymlink(conf);
DistributedCache.addCacheFile(new Path("hdfs://namenode:9000/hadoop/test1").toUri().toString() + "#" + "test1",conf);
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论