MapReduce跨大小文件操作
一:需求描述:
统计一类商品总共卖出的金额
二:源文件:
2.1Order文件
订单ID 时间 商品ID 购买数量 1001 20170710 4 2 1002 20170710 3 100 1003 20170710 2 40 1004 20170711 2 23 1005 20170823 4 55 1006 20170824 3 20 1007 20170825 2 3 1008 20170826 4 23 1009 20170912 2 10 1010 20170913 2 2 1011 20170914 3 14 1012 20170915 3 18
2.2Producet文件
商品ID 商品名称 商品价格 1 chuizi 3999 2 huawei 3999 3 xiaomi 2999 4 apple 5999
三:跨文件操作
3.1目的
避免数据倾斜,提高计算效率
3.2实现思路
将小文件(Product文件)缓存起来,大文件处理过程中需要小文件时候再去缓存中获取,就无需单独启动一个MapTask处理小文件
3.3代码结构
3.3.1新建属性类(大文件)
根据源文件和预期结果来定义属性类的属性
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class Order implements Writable { //序列化时属性不准为空 private int oid; private String date = ""; private int pid; private int num; private String name = ""; private double price; public int getOid() { return oid; } public void setOid(int oid) { this.oid = oid; } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public int getPid() { return pid; } public void setPid(int pid) { this.pid = pid; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override() public void write(DataOutput out) throws IOException { out.writeInt(oid); out.writeUTF(date); out.writeInt(pid); out.writeInt(num); out.writeUTF(name); out.writeDouble(price); } @Override() public void readFields(DataInput in) throws IOException { this.oid = in.readInt(); this.date = in.readUTF(); this.pid = in.readInt(); this.num = in.readInt(); this.name = in.readUTF(); this.price = in.readDouble(); } }
注意:
- 序列化时读的顺序也是写的顺序
3.3.2Driver类
注意:
- 在Driver中实现小文件的缓存
- 缓存的不是小文件本身内容,而是其在HDFS中的地址
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JoinDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(cn.ttdd.join.JoinDriver.class); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Order.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); //直接用目标小文件的地址初始化URI数组 URI[] files = { new URI("hdfs://192.168.197.130:9000/txt/union/product.txt") }; //直接调用job中的setCacheFiles方法,传入小文件在HDFS中的URI地址,此处也可以缓存多个文件 job.setCacheFiles(files); //输入文件 写的是大文件的路径 FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.197.130:9000/txt/union/order.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.197.130:9000/result/join")); if (!job.waitForCompletion(true)) return; } }
3.3.3Mapper
setup方法
- 执行时间:map之前,可作为初始化方法
- 作用:从缓存中获取小文件路径,并从HDFS从获取小文件数据,读取小文件数据后通过映射关系保存起来再传递给map方法。
- 因为此处需要用到映射关系,所以需要提前new一个map,不在map中new。
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class JoinMapper extends Mapper<LongWritable, Text, Text, Order> { private Map<Integer, Order> map = new HashMap<>(); //获取小文件数据 @Override() protected void setup(Mapper<LongWritable, Text, Text, Order>.Context context) throws IOException, InterruptedException { //set入job中的内容可以通过context //获取缓存URI数组中目标小文件的地址 URI file = context.getCacheFiles()[0]; //根据获取到地址连接HDFS FileSystem fs = FileSystem.get(file, context.getConfiguration()); //获取针对这个文件的字节输入流 InputStream in = fs.open(new Path(file)); //将字节流包装为字符流,方便按行读取 BufferedReader reader = new BufferedReader(new InputStreamReader(in)); //readLine循环按行读取数据 while ((line = reader.readLine()) != null) { // 此时获取到的line也就是行数据结构为 line = "1 chuizi 3999" // 将行数据切分后解析为属性类对象(此处的对象是为了方便存储) String[] arr = line.split(" "); Order o = new Order(); o.setPid(Integer.parseInt(arr[0])); o.setName(arr[1]); o.setPrice(Double.parseDouble(arr[2])); // 将解析后的对象放到容器中,以便map方法适用 map.put(o.getPid(), o); } // 关流 reader.close(); } //map方法处理大文件,并调用小文件保存到映射关系中的数据 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //一行行读取 1001 20170710 4 2 //切分行数据 String[] arr = value.toString().split(" "); //将大文件字段属性set入对象中(此处的对象是为了输出) Order o = new Order(); o.setOid(Integer.parseInt(arr[0])); o.setDate(arr[1]); o.setPid(Integer.parseInt(arr[2])); o.setNum(Integer.parseInt(arr[3])); //对象中另一部分不属于大文件的小文件的数据,需要从映射关系map中获取 //此处o.getPid()获取的是大文件属性的商品id,也是小文件中的商品 //通过该id去map映射关系中获取到小文件的对象(小文件对象set进入map的时候以商品id为K) //再通过从小文件的对象获取所需要的属性set入大文件的对象中 o.setName(map.get(o.getPid()).getName()); o.setPrice(map.get(o.getPid()).getPrice()); // 写出 大文件对象 context.write(new Text(o.getName()), o); } }
数据流程示意图:
3.3.4Reducer代码
import java.io.IOException; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class JoinReducer extends Reducer<Text, Order, Text, DoubleWritable> { public void reduce(Text key, Iterable<Order> values, Context context) throws IOException, InterruptedException { double sum = 0; for (Order val : values) { sum += val.getNum() * val.getPrice(); } context.write(key, new DoubleWritable(sum)); } }