在 Hadoop 上使用 DistributedCache 时找不到文件

发布于 2024-12-26 02:29:54 字数 2880 浏览 0 评论 0原文

我使用下面的代码遇到了 fileNotFound 问题,这是对distributedCache的简单测试,我不知道问题是什么?

文件的路径是正确的,但我在 datanode 上找不到文件:

package mapJoinTest2;

/*
 * this is for map join using DistributedCache
 * using class Path to get cache file in datanode
 * 
 * 2012.1.13
 *  */
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;  
import java.net.URI;
import java.util.*;  
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.conf.*;  
import org.apache.hadoop.io.*;  
import org.apache.hadoop.mapred.*;  
import org.apache.hadoop.util.*;  

 public class wordCount {  
   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {  

  private Text word = new Text();  
  private Text mapKey = new Text();
  private Path [] localFile= new Path[1];
  private FileSystem fs;

  public void configure(JobConf job){
      try {
        fs = FileSystem.getLocal(new Configuration());
        localFile = DistributedCache.getLocalCacheFiles(job);
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }
  }


  public void map(LongWritable  key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
{  
    for(Path f:localFile){
        System.out.println(f.toString());
    }

    mapKey.set("success");
    output.collect(mapKey, value);

    }  
}  


public static void main(String[] args) throws Exception {  
  JobConf conf = new JobConf(wordCount.class);  
  conf.setJobName("wordcount");  

  conf.setOutputKeyClass(Text.class);  
  conf.setOutputValueClass(Text.class);  

  conf.setMapperClass(Map.class);  
  conf.setNumReduceTasks(0);

  conf.setInputFormat(TextInputFormat.class);  
  conf.setOutputFormat(TextOutputFormat.class);  

  FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));  


  String path ="hdfs://namenode:9000/hadoop/test1";  // this file has already put on hdfs.
  Path filePath = new Path(path);
  String uriWithLink = filePath.toUri().toString();
  DistributedCache.addCacheFile(new URI(uriWithLink), conf);

  JobClient.runJob(conf);  
}  
}  

此时我得到一个 NullPointerException

    for(Path f:localFile){
        System.out.println(f.toString());
    }

问题是因为 f 的值为 null

我使用了下面的代码,但它不起作用。

  DistributedCache.createdSymlink(conf);
  DistributedCache.addCacheFile(new Path("hdfs://namenode:9000/hadoop/test1").toUri().toString() + "#" + "test1",conf);

I have a fileNotFound problem using code below, this is a simple test of distributedCache, I don't know what is the problem?

The path of the file is right, but I can not find files on datanode:

package mapJoinTest2;

/*
 * this is for map join using DistributedCache
 * using class Path to get cache file in datanode
 * 
 * 2012.1.13
 *  */
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;  
import java.net.URI;
import java.util.*;  
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.conf.*;  
import org.apache.hadoop.io.*;  
import org.apache.hadoop.mapred.*;  
import org.apache.hadoop.util.*;  

 public class wordCount {  
   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {  

  private Text word = new Text();  
  private Text mapKey = new Text();
  private Path [] localFile= new Path[1];
  private FileSystem fs;

  public void configure(JobConf job){
      try {
        fs = FileSystem.getLocal(new Configuration());
        localFile = DistributedCache.getLocalCacheFiles(job);
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }
  }


  public void map(LongWritable  key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
{  
    for(Path f:localFile){
        System.out.println(f.toString());
    }

    mapKey.set("success");
    output.collect(mapKey, value);

    }  
}  


public static void main(String[] args) throws Exception {  
  JobConf conf = new JobConf(wordCount.class);  
  conf.setJobName("wordcount");  

  conf.setOutputKeyClass(Text.class);  
  conf.setOutputValueClass(Text.class);  

  conf.setMapperClass(Map.class);  
  conf.setNumReduceTasks(0);

  conf.setInputFormat(TextInputFormat.class);  
  conf.setOutputFormat(TextOutputFormat.class);  

  FileInputFormat.setInputPaths(conf, new Path(args[0]));  
  FileOutputFormat.setOutputPath(conf, new Path(args[1]));  


  String path ="hdfs://namenode:9000/hadoop/test1";  // this file has already put on hdfs.
  Path filePath = new Path(path);
  String uriWithLink = filePath.toUri().toString();
  DistributedCache.addCacheFile(new URI(uriWithLink), conf);

  JobClient.runJob(conf);  
}  
}  

I get a NullPointerException at this point:

    for(Path f:localFile){
        System.out.println(f.toString());
    }

The problem is because value of f is null.

I used code below, but it wound`t work.

  DistributedCache.createdSymlink(conf);
  DistributedCache.addCacheFile(new Path("hdfs://namenode:9000/hadoop/test1").toUri().toString() + "#" + "test1",conf);

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文