[大数据]MapReducer 排序控制、分区控制、分组控制

数据结构:

NewOrderBean

001public class NewOrderBean implements WritableComparable<NewOrderBean>{
002    private String orderId;
003    private String orderUser;
004    private String orderName;
005    private float orderPrice;
006    private int orderNum;
007    private float totalPrice;
008     
009    public NewOrderBean() {}
010 
011    public NewOrderBean(String orderId,String orderUser, String orderName, float orderPrice, int orderNum) {
012        super();
013        this.orderId = orderId;
014        this.orderUser = orderUser;
015        this.orderName = orderName;
016        this.orderPrice = orderPrice;
017        this.orderNum = orderNum;
018        this.totalPrice  = orderPrice * orderNum;
019    }
020     
021    public void set(String orderId,String orderUser,String orderName,String orderPrice,String orderNum) {
022        this.orderId = orderId;
023        this.orderUser = orderUser;
024        this.orderName = orderName;
025        this.orderPrice = Float.parseFloat(orderPrice);
026        this.orderNum = Integer.parseInt(orderNum);
027        this.totalPrice = this.orderPrice * this.orderNum;
028         
029    }
030     
031     
032    public String getOrderUser() {
033        return orderUser;
034    }
035 
036    public void setOrderUser(String orderUser) {
037        this.orderUser = orderUser;
038    }
039 
040    public String getOrderId() {
041        return orderId;
042    }
043 
044    public void setOrderId(String orderId) {
045        this.orderId = orderId;
046    }
047 
048    public String getOrderName() {
049        return orderName;
050    }
051 
052    public void setOrderName(String orderName) {
053        this.orderName = orderName;
054    }
055 
056    public double getOrderPrice() {
057        return orderPrice;
058    }
059 
060    public void setOrderPrice(float orderPrice) {
061        this.orderPrice = orderPrice;
062    }
063 
064    public int getOrderNum() {
065        return orderNum;
066    }
067 
068    public void setOrderNum(int orderNum) {
069        this.orderNum = orderNum;
070    }
071 
072    public float getTotalPrice() {
073        return totalPrice;
074    }
075 
076    public void setTotalPrice(float totalPrice) {
077        this.totalPrice = totalPrice;
078    }
079     
080     
081    @Override
082    public String toString() {
083        return "OrderBean [orderId=" + orderId + ", orderUser=" + orderUser + ", orderName=" + orderName
084                + ", orderPrice=" + orderPrice + ", orderNum=" + orderNum + ", totalPrice=" + totalPrice + "]";
085    }
086     
087 
088    public void readFields(DataInput in) throws IOException {
089        // TODO Auto-generated method stub
090        this.orderId = in.readUTF();
091        this.orderUser = in.readUTF();
092        this.orderName = in.readUTF();
093        this.orderPrice = in.readFloat();
094        this.orderNum = in.readInt();
095        this.totalPrice = this.orderPrice * this.orderNum;
096    }
097 
098     
099    public void write(DataOutput out) throws IOException {
100        // TODO Auto-generated method stub
101        out.writeUTF(orderId);
102        out.writeUTF(orderUser);
103        out.writeUTF(orderName);
104        out.writeFloat(orderPrice);
105        out.writeInt(orderNum);
106    }
107 
108    public int compareTo(NewOrderBean o) {
109        // TODO Auto-generated method stub
110        //return (int) (o.getOrderPrice() - this.orderPrice == 0 ? this.orderName.compareTo(o.getOrderName()) :o.getOrderPrice() - this.orderPrice) ;
111        //return Float.compare(o.getTotalPrice(), this.totalPrice) == 0 ? this.orderName.compareTo(o.getOrderName()) : Double.compare(o.getTotalPrice(), this.totalPrice);
112         
113        return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getTotalPrice(), this.getTotalPrice()):this.orderId.compareTo(o.getOrderId());
114    }
115     
116     
117}


partition分区

1public class OrderPartitioner extends Partitioner<NewOrderBean, NullWritable>{
2    @Override
3    public int getPartition(NewOrderBean key, NullWritable value, int numPartitions) {
4        // TODO Auto-generated method stub
5        // 按照订单中的orderid来分发数据
6        return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
7    }
8 
9}


grouping 分组

01public class OrderGrouping extends WritableComparator {
02     
03    public OrderGrouping() {
04        super(NewOrderBean.class,true);
05    }
06     
07     
08    @Override
09    public int compare(WritableComparable a, WritableComparable b) {
10        // TODO Auto-generated method stub
11         
12        NewOrderBean o1 = (NewOrderBean)a;
13        NewOrderBean o2 = (NewOrderBean)b;
14         
15        return o1.getOrderId().compareTo(o2.getOrderId());
16    }
17}


main处理

01public class GroupComparator {
02     
03    public static class GroupMapper extends Mapper<LongWritable, Text, NewOrderBean, NullWritable>{
04         
05        NewOrderBean orderBean = new NewOrderBean();
06 
07        @Override
08        protected void map(LongWritable key, Text value,
09                Mapper<LongWritable, Text, NewOrderBean, NullWritable>.Context context)
10                throws IOException, InterruptedException {
11             
12            String line = value.toString();
13            String[] fields = line.split(",");
14             
15            orderBean.set(fields[0], fields[1], fields[2], fields[3], fields[4]);
16             
17            /*
18            k.set(fields[0]);
19            context.write(k, orderBean);
20            */
21            // 按照key写了一堆的数据
22            context.write(orderBean, NullWritable.get());  
23        }
24         
25    }
26     
27     
28    public static class GroupReducer extends Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>{
29        @Override
30        protected void reduce(NewOrderBean key, Iterable<NullWritable> values,
31                Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>.Context context)
32                throws IOException, InterruptedException {
33             
34            //分组输出
35            int topn = context.getConfiguration().getInt("order.top.n",3);
36             
37            /*
38            for(int i=0;i<topn;i++)
39            {
40                NewOrderBean o = key;
41                context.write(o, NullWritable.get());
42            }
43            */
44            int i=0;
45            for (NullWritable v : values) {
46                context.write(key, v);
47                if(++i==topn) return;
48            }
49             
50             
51             
52        }
53    }
54     
55     
56     
57    public static void main(String[] args) throws Exception{
58         
59        Configuration conf = new Configuration();
60        conf.setInt("order.top.n", 3);
61        Job job = Job.getInstance(conf);
62         
63        job.setJarByClass(GroupComparator.class);
64         
65        job.setMapperClass(GroupMapper.class);
66        //设置partition
67        job.setPartitionerClass(OrderPartitioner.class);
68        job.setReducerClass(GroupReducer.class);
69         
70        //设置grouping
71        job.setGroupingComparatorClass(OrderGrouping.class);
72         
73        job.setMapOutputKeyClass(NewOrderBean.class);
74        job.setMapOutputValueClass(NullWritable.class);
75         
76        job.setOutputKeyClass(NewOrderBean.class);
77        job.setOutputValueClass(NullWritable.class);
78         
79        FileInputFormat.setInputPaths(job, new Path("e:/mrdata/wordcount/input"));
80        FileOutputFormat.setOutputPath(job, new Path("e:/mrdata/wordcount/output12"));
81         
82        job.setNumReduceTasks(2);
83         
84        boolean res = job.waitForCompletion(true);
85        System.exit(res?0:-1); 
86         
87    }
88 
89}

原文链接: [大数据]MapReducer 排序控制、分区控制、分组控制 版权所有,转载时请注明出处,违者必究。
注明出处格式:流沙团 ( http://www.gyarmy.com/?post=533 )

发表评论

0则评论给“[大数据]MapReducer 排序控制、分区控制、分组控制”