MapReduce(以Flow流量统计为)_完成度70%

一:需求分析
统计源数据中的流量
二:源数据
手机号码     地区     姓名 所用流量
13877779999 bj zs 2145
13766668888 sh ls 1028
13766668888 sh ls 9987
13877779999 bj zs 5678
13544445555 sz ww 10577
13877779999 sh zs 2145
13766668888 sh ls 9987


二:实现要点:
多个字段传递而非双字段,要传递的话需要将所有字段封装起来传递给mapper,
如果是数组或集合的话,数据类型要保持一致,此处数据类型不一致,所以考虑传递对象的方法
流程示意图


四:字段属性类
import org.apache.hadoop.io.Writable;

public class Flow implements Writable {

	private String phone;
	private String addr;
	private String name;
	private int flow;

	public String getPhone() {
		return phone;
	}

	public void setPhone(String phone) {
		this.phone = phone;
	}

	public String getAddr() {
		return addr;
	}

	public void setAddr(String addr) {
		this.addr = addr;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getFlow() {
		return flow;
	}

	public void setFlow(int flow) {
		this.flow = flow;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.phone = in.readUTF();
		this.addr = in.readUTF();
		this.name = in.readUTF();
		this.flow = in.readInt();
	}

	// 如果需要进行序列化,那么只需要将有必要的属性依次写出即可
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phone);
		out.writeUTF(addr);
		out.writeUTF(name);
		out.writeInt(flow);
	}

}
注意:
1.实现Writable接口
让该类可以被序列化可以被传输给mapper
2.重写readFields方法
作用:该类
重写write方法


三:Mapper代码
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SerialFlowMapper 
	extends Mapper<LongWritable, Text, Text, Flow> {

        //处理一行行数据的逻辑,一行属性set进一个对象
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
	    	
                //1.切分数据
                String[] arr = value.toString().split(" ");
                
                //2.新建字段属性对象
		Flow f = new Flow();
                
                //3.切分出数据后,将有用的数据,存放set到对象中
		f.setPhone(arr[0]);
		f.setAddr(arr[1]);
		f.setName(arr[2]);
		f.setFlow(Integer.parseInt(arr[3]));
		
                //4.输出   姓名   对象(电话,地址,姓名,流量)
                context.write(new Text(f.getName()), f);
	}

}

四:Reducer代码
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class PartFlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
        
        //每处理一次K 调用一次reduce
	public void reduce(Text key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		
                //values是Flow对象组合形成的迭代器
                for (Flow val : values) {
			sum += val.getFlow();
		}
                
                //此处输出的Key是名字 Value是流量统计用量
		context.write(key, new IntWritable(sum));
	}

}

五:Driver代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PartFlowDriver {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "JobName");
		job.setJarByClass(cn.tedu.partflow.PartFlowDriver.class);
		job.setMapperClass(PartFlowMapper.class);
		job.setReducerClass(PartFlowReducer.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Flow.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.197.130:9000/txt/flow.txt"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.197.130:9000/result/partflow"));

		if (!job.waitForCompletion(true))
			return;
	}

}
六:输出结果

七:新需求
需求描述:如何分地区统计每一个人消费的总流量,之前是将同一个人不同地区的流量消费统计在一块,此处要实现分地区统计。
实现要点:
分区(Partitioner):map的数据传输给分区后,分区将数据分类,一类数据对应一个Reduce处理,对应一个结果文件。
注意:无论分不分区,map和reduce的逻辑都不会发生改变,属性类也不会发生改变

九:新建分区类:FlowPartitioner
9.1代码结构
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowPartitioner extends Partitioner<Text, Flow> {

	// 在MapReducer中,分区默认是从0开始递增的
	@Override
	public int getPartition(Text key, Flow value, int numReduceTasks) {
		String addr = value.getAddr();
		if (addr.equals("bj"))
			return 0;
		else if (addr.equals("sh"))
			return 1;
		else
			return 2;
	}

}

9.2代码解析
分区类要继承Partitioner,并传入Map的输出泛型
重写getPartition方法:
参数:
  1. 第一参数是Map传入的k
  2. 第二参数是Map传入的v
  3. 第三参数是数据分区后对应的reduce任务数量
方法体逻辑:获取属性后,对属性进行判断,不同数据返回不同int值

返回值:int,也就是分区编号,默认是从0开始递增





全部评论

相关推荐

03-12 15:35
嘉应学院 Python
快说谢谢牛牛精灵:说不定就是下一个寒武纪!
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务