hadoop 基础 - hadoop 编程环境搭建与开发

发布于 2023-04-20 21:41:07 字数 13529 浏览 79 评论 0

开发环境 idea + mac

新建项目

新建项目

新建项目使用 maven 的方式并选择 java8(可编辑选择已安装的 java 版本)

填入 Groupid(GroupID 是项目组织唯一的标识符,实际对应 JAVA 的包的结构,是 main 目录里 java 的目录结构)和 ArtifactID(ArtifactID 就是项目的唯一的标识符,实际对应项目的名称,就是项目根目录的名称)

设置 hadoop.version=2.7.2,编辑pom.xml文件,然后点击import chage安装相关依赖。相关依赖会安装在{USER_HOME}/.m2目录

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.insideRia</groupId>
    <artifactId>hdfsFileSystem</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>2.7.2</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>1.1.0</version>
            <classifier>hadoop2</classifier>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-minicluster</artifactId>
            <version>${hadoop.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

在【src】->【main】->【java】目录下新建 java 文件 WordCount

编译与打包

点击 File->Project Structure,弹出对话框编辑

选择 wordcount

只选择需要打包的 wordcount,其他 hadoop 包在集群有,没有必要上传打包。点击 apply

编译点击【Build】->【Build Artifacts】,生成的 java 包在 {$HOME}/out/artifacts/wordcount_jar/wordcount.jar

wordcount.jar 包含以下内容:

wordcount 案例

编辑 wordcount.txt 文件,并上传到 hdfs://data/input/wordcount.txt

hadoop inside hive
walter boy handcome
salary inside hadoop
baby love hadoop

编译 wordcount 打包 jar 包

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCount {
    public static class TokenizerMapper
            extends Mapper<LongWritable, Text, Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            //Hello you
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()){
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
        private IntWritable result = new IntWritable();

        //(Hello, {1, 1})
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException{
            int sum = 0;
            for (IntWritable val: values){
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        if(args.length < 2){
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

运行 wordcount.jar 包

$ hadoop jar wordcount.jar /data/input  /data/output

查询学生总分数案例

编辑 score.txt 文件,并上传到并上传到 hdfs://data/score/score.txt

1 jack 78 15
2 tome 23 16
3 jane 45 14
1 jack 90 15
2 tome 56 16
3 jane 88 14

分析

分析 map

{1, user}, {1, user}

分析 reduce 过程

{1, {user, user}}

使用 mapreduce 程序得到学生总分数如下:

jack 168 15
tome 79 16
jane 133 14

源码

UserMapper.java

package cleland.club;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;


public class UserMapper extends Mapper<LongWritable, Text, Text, UserWritable>{

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
        StringTokenizer str = new StringTokenizer(value.toString());
        int counter = 0;

        String id = "";
        String name = "";
        int score = 0;
        short age = 0;
        while(str.hasMoreTokens()){
            if(counter == 0){
                id = str.nextToken();
            }else if(counter == 1){
                name = str.nextToken();
            }else if(counter == 2){
                score = Short.parseShort(str.nextToken());
            }else if(counter == 3){
                age = Short.parseShort(str.nextToken());
            }
            counter += 1;
        }

        context.write(new Text(id), new UserWritable(id,name,score,age));
    }
}

UserReduce.java

package cleland.club;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class UserReducer extends Reducer<Text, UserWritable, Text, UserWritable> {

    public void reduce(Text key, Iterable<UserWritable> values, Context context) throws IOException,InterruptedException{
        int totalScore = 0;
        UserWritable resultUser = null;

        for(UserWritable user: values){
            if(resultUser == null){
                resultUser = new UserWritable(user.getId(),user.getName(),user.getScore(),user.getAge());
            }
            totalScore += user.getScore();
        }
        resultUser.setScore(totalScore);
        context.write(key,resultUser);
    }
}

UserScoreInfo.java

package cleland.club;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class UserScoreInfo {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        if(args.length>2){
            System.err.println("Usage: userscoreinfo <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "student total score");
        job.setJarByClass(UserScoreInfo.class);
        job.setMapperClass(UserMapper.class);
        job.setReducerClass(UserReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(UserWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

UserWritable.java 自定义类型

package cleland.club;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class UserWritable implements WritableComparable<UserWritable> {
    private String name = "";
    private int score = 0;
    private short age = 0;
    private String id = "";

    //在反序列化时,反射机制需要调用空参构造函数
    public UserWritable(){};

    public UserWritable(String id, String name, int score, short age){
        this.id = id;
        this.name = name;
        this.score = score;
        this.age = age;
    }

    public String getName(){
        return name;
    }

    public void setName(String name){
        this.name = name;
    }

    public int getScore(){
        return score;
    }

    public void setScore(int score){
        this.score = score;
    }

    public short getAge(){
        return age;
    }

    public void setAge(short age){
        this.age = age;
    }

    public String getId(){
        return this.id;
    }

    public void setId(String id){
        this.id = id;
    }

    public void set(String id, String name, int score, short age){
        this.id = id;
        this.name = name;
        this.score = score;
        this.age = age;
    }

    @Override
    public int compareTo(UserWritable o){
        int result  = this.name.compareTo(o.getName());
        if (result != 0){
            return result;
        }

        return this.score > o.getScore() ? 1 : -1;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException{
        dataOutput.writeUTF(this.id);
        dataOutput.writeUTF(this.name);
        dataOutput.writeInt(this.score);
        dataOutput.writeShort(this.age);
    }

    @Override
    public void readFields(DataInput dataInput) throws  IOException{
        this.id = dataInput.readUTF();
        this.name = dataInput.readUTF();
        this.score = dataInput.readInt();
        this.age = dataInput.readShort();
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        UserWritable that = (UserWritable) o;

        if (score != that.score) return false;
        if (age != that.age) return false;
        if (name != null ? !name.equals(that.name) : that.name != null) return false;
        return id != null ? id.equals(that.id) : that.id == null;
    }

    @Override
    public int hashCode(){
        int result = name != null ? name.hashCode() : 0;
        result = 31 * result + score;
        result = 31 * result + (int) age;
        result = 31 * result + (id != null ? id.hashCode() : 0);
        return result;
    }

    @Override
    public String toString(){
        return "name='" + name + '\'' +
                ", score=" + score +
                ", age=" + age +
                ", id='" + id + "'";
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

也只是曾经

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

醉城メ夜风

文章 0 评论 0

远昼

文章 0 评论 0

平生欢

文章 0 评论 0

微凉

文章 0 评论 0

Honwey

文章 0 评论 0

qq_ikhFfg

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文