[toc]

一、需求分析

1. 需求和结果对比

统计每一个手机号耗费的总上行流量、总下行流量、总流量

  1. 输入数据
    image.png

  2. 输入数据格式:
    7 13560436666 120.196.100.99 1116 954 200
    id 手机号码 网络ip 上行流量 下行流量 网络状态码

  3. 期望输出数据格式
    13560436666 1116 954 2070
    手机号码 上行流量 下行流量 总流量

2. 需求分析

image.png

二、编写本地测试代码

1. MapReduceDemo项目中创建com.learn.beandemo包

image.png

2. 编写bean代码

package com.learn.beandemo;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {

    private long upFlow; //上行流量
    private long downFlow; //下行流量
    private long sumFlow; //总流量

    // 默认存在空构造函数
    public FlowBean(){}

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    /**
     * 序列化
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}

3. 编写mapper代码

package com.learn.beandemo;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 *         // map 任务:输入样本,进行根据key进行输出
 *         // value 每一行数据 样本:
 *         // 7 	13560436666	120.196.100.99		1116		 954			200
 *         // id	手机号码		网络ip			上行流量  下行流量     网络状态码
 *
 *         // 期望结果
 *         // 13560436666 		1116		      954 			2070
 *         // 手机号码		    上行流量        下行流量		总流量
 */
public class BeanMapper extends Mapper<LongWritable, Text, Text, FlowBean> {

    // 代码优化,实例化提取到全局变量
    FlowBean flowBean = new FlowBean();
    Text text = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1. hadoop类型Text,转换成java类型String,进行数据操作
        String line = value.toString();

        // 2. 根据业务进行切割数据,进行划分不同字段
        String[] split = line.split("\t");

        // 3. 封装
        flowBean.setUpFlow(Long.parseLong(split[split.length - 3]));
        flowBean.setDownFlow(Long.parseLong(split[split.length - 2]));
        flowBean.setSumFlow();

        // 4. 发送下游
        text.set(split[1]);
        context.write(text, flowBean);
    }
}

4. 编写reducer代码

package com.learn.beandemo;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 *         // reduce 任务:根据key 聚合value中上行流量、下行流量和总流量
 *         // 上游处理后的数据样本:
 *         // 13560436666 		1116		      954 			2070
 *         // 手机号码		    上行流量        下行流量		总流量
 *
 *         // 期望结果
 *         // 13560436666 		1116(总和)		      954(总和) 			2070(总和)
 *         // 手机号码		    上行流量(总和)        下行流量(总和)		总流量(总和)
 */
public class BeanReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    // 代码优化,实例化提取到全局变量
    FlowBean flowBean = new FlowBean();
    Text text = new Text();


    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        // 1. 累计values中的上行和下行流量值
        long upFlowSum = 0;
        long downFlowSum = 0;
        for (FlowBean flowBean : values) {
            upFlowSum += flowBean.getUpFlow();
            downFlowSum += flowBean.getDownFlow();
        }

        // 2. 封装flowBean
        flowBean.setUpFlow(upFlowSum);
        flowBean.setDownFlow(downFlowSum);
        flowBean.setSumFlow();


        // 3. 写出结果
        text.set(key);
        context.write(text, flowBean);
    }
}

5. 编写driver代码

package com.learn.beandemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class BeanDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1. 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2. 指定driver类
        job.setJarByClass(BeanDriver.class);

        // 3. 指定mapper和reducer类
        job.setMapperClass(BeanMapper.class);
        job.setReducerClass(BeanReducer.class);

        // 4. 指定map输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5. 指定最后输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6. 指定文件输入路径和结果输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7. 提交
        boolean b = job.waitForCompletion(true);
        System.out.println(b ? "成功" : "失败");


    }
}

三、测试

1. 编辑启动选项

启动页面中添加输入和输出参数,以空格隔开
image.png

2. 查看执行结果

image.png

image.png

image.png

Q.E.D.


只有创造,才是真正的享受,只有拚搏,才是充实的生活。