MapReduce跨大小文件操作

一:需求描述:

统计一类商品总共卖出的金额

二:源文件:

2.1Order文件

订单ID  时间   商品ID  购买数量 
1001 20170710 4 2
1002 20170710 3 100
1003 20170710 2 40
1004 20170711 2 23
1005 20170823 4 55
1006 20170824 3 20
1007 20170825 2 3
1008 20170826 4 23
1009 20170912 2 10
1010 20170913 2 2
1011 20170914 3 14
1012 20170915 3 18

2.2Producet文件


商品ID 商品名称  商品价格
1 chuizi 3999
2 huawei 3999
3 xiaomi 2999
4 apple 5999

三:跨文件操作

3.1目的

避免数据倾斜,提高计算效率

3.2实现思路

将小文件(Product文件)缓存起来,大文件处理过程中需要小文件时候再去缓存中获取,就无需单独启动一个MapTask处理小文件

3.3代码结构


3.3.1新建属性类(大文件)

根据源文件和预期结果来定义属性类的属性
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class Order implements Writable {
        //序列化时属性不准为空
	private int oid;
	private String date = "";
	private int pid;
	private int num;
	private String name = "";
	private double price;

	public int getOid() {
		return oid;
	}

	public void setOid(int oid) {
		this.oid = oid;
	}

	public String getDate() {
		return date;
	}

	public void setDate(String date) {
		this.date = date;
	}

	public int getPid() {
		return pid;
	}

	public void setPid(int pid) {
		this.pid = pid;
	}

	public int getNum() {
		return num;
	}

	public void setNum(int num) {
		this.num = num;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public double getPrice() {
		return price;
	}

	public void setPrice(double price) {
		this.price = price;
	}

	@Override()
	public void write(DataOutput out) throws IOException {
		out.writeInt(oid);
		out.writeUTF(date);
		out.writeInt(pid);
		out.writeInt(num);
		out.writeUTF(name);
		out.writeDouble(price);
	}

	@Override()
	public void readFields(DataInput in) throws IOException {
		this.oid = in.readInt();
		this.date = in.readUTF();
		this.pid = in.readInt();
		this.num = in.readInt();
		this.name = in.readUTF();
		this.price = in.readDouble();
	}

}


注意:
  • 序列化时读的顺序也是写的顺序

3.3.2Driver类


注意:
  • 在Driver中实现小文件的缓存
  • 缓存的不是小文件本身内容,而是其在HDFS中的地址
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JoinDriver {

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf, "JobName");
		job.setJarByClass(cn.ttdd.join.JoinDriver.class);
		job.setMapperClass(JoinMapper.class);
		job.setReducerClass(JoinReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Order.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);  
                
        //直接用目标小文件的地址初始化URI数组
        URI[] files = { new URI("hdfs://192.168.197.130:9000/txt/union/product.txt") };

        //直接调用job中的setCacheFiles方法,传入小文件在HDFS中的URI地址,此处也可以缓存多个文件
		job.setCacheFiles(files);
                
        //输入文件 写的是大文件的路径
		FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.197.130:9000/txt/union/order.txt"));

		FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.197.130:9000/result/join"));

		if (!job.waitForCompletion(true))
			return;
	}

}



3.3.3Mapper

setup方法
  • 执行时间:map之前,可作为初始化方法
  • 作用:从缓存中获取小文件路径,并从HDFS从获取小文件数据,读取小文件数据后通过映射关系保存起来再传递给map方法。
  • 因为此处需要用到映射关系,所以需要提前new一个map,不在map中new。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class JoinMapper extends Mapper<LongWritable, Text, Text, Order> {

	private Map<Integer, Order> map = new HashMap<>();

	//获取小文件数据
	@Override()
	protected void setup(Mapper<LongWritable, Text, Text, Order>.Context context)
			throws IOException, InterruptedException {

		//set入job中的内容可以通过context
		//获取缓存URI数组中目标小文件的地址
		URI file = context.getCacheFiles()[0];

		//根据获取到地址连接HDFS
		FileSystem fs = FileSystem.get(file, context.getConfiguration());  
                //获取针对这个文件的字节输入流
		InputStream in = fs.open(new Path(file));

		//将字节流包装为字符流,方便按行读取
		BufferedReader reader = new BufferedReader(new InputStreamReader(in));

		//readLine循环按行读取数据
		while ((line = reader.readLine()) != null) {

			// 此时获取到的line也就是行数据结构为 line = "1 chuizi 3999"

			// 将行数据切分后解析为属性类对象(此处的对象是为了方便存储)
			String[] arr = line.split(" ");
			Order o = new Order();
			o.setPid(Integer.parseInt(arr[0]));
			o.setName(arr[1]);
			o.setPrice(Double.parseDouble(arr[2]));

			// 将解析后的对象放到容器中,以便map方法适用
			map.put(o.getPid(), o);
		}

		// 关流
		reader.close();
	}

	//map方法处理大文件,并调用小文件保存到映射关系中的数据
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		//一行行读取 1001 20170710 4 2
		//切分行数据
		String[] arr = value.toString().split(" ");

		//将大文件字段属性set入对象中(此处的对象是为了输出)
		Order o = new Order();
		o.setOid(Integer.parseInt(arr[0]));
		o.setDate(arr[1]);
		o.setPid(Integer.parseInt(arr[2]));
		o.setNum(Integer.parseInt(arr[3])); 

		//对象中另一部分不属于大文件的小文件的数据,需要从映射关系map中获取
		//此处o.getPid()获取的是大文件属性的商品id,也是小文件中的商品
		//通过该id去map映射关系中获取到小文件的对象(小文件对象set进入map的时候以商品id为K)
		//再通过从小文件的对象获取所需要的属性set入大文件的对象中   o.setName(map.get(o.getPid()).getName());
		o.setPrice(map.get(o.getPid()).getPrice());

		// 写出 大文件对象
		context.write(new Text(o.getName()), o);

	}

}


数据流程示意图:



3.3.4Reducer代码

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer 
	extends Reducer<Text, Order, Text, DoubleWritable> {

	public void reduce(Text key, Iterable<Order> values, Context context) throws IOException, InterruptedException {
		double sum = 0;
		for (Order val : values) {
			sum += val.getNum() * val.getPrice();
		}
		context.write(key, new DoubleWritable(sum));
	}

}




全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务