数据结构:
NewOrderBean
001 | public class NewOrderBean implements WritableComparable<NewOrderBean>{ |
002 | private String orderId; |
003 | private String orderUser; |
004 | private String orderName; |
005 | private float orderPrice; |
006 | private int orderNum; |
007 | private float totalPrice; |
008 | |
009 | public NewOrderBean() {} |
010 |
011 | public NewOrderBean(String orderId,String orderUser, String orderName, float orderPrice, int orderNum) { |
012 | super (); |
013 | this .orderId = orderId; |
014 | this .orderUser = orderUser; |
015 | this .orderName = orderName; |
016 | this .orderPrice = orderPrice; |
017 | this .orderNum = orderNum; |
018 | this .totalPrice = orderPrice * orderNum; |
019 | } |
020 | |
021 | public void set(String orderId,String orderUser,String orderName,String orderPrice,String orderNum) { |
022 | this .orderId = orderId; |
023 | this .orderUser = orderUser; |
024 | this .orderName = orderName; |
025 | this .orderPrice = Float.parseFloat(orderPrice); |
026 | this .orderNum = Integer.parseInt(orderNum); |
027 | this .totalPrice = this .orderPrice * this .orderNum; |
028 | |
029 | } |
030 | |
031 | |
032 | public String getOrderUser() { |
033 | return orderUser; |
034 | } |
035 |
036 | public void setOrderUser(String orderUser) { |
037 | this .orderUser = orderUser; |
038 | } |
039 |
040 | public String getOrderId() { |
041 | return orderId; |
042 | } |
043 |
044 | public void setOrderId(String orderId) { |
045 | this .orderId = orderId; |
046 | } |
047 |
048 | public String getOrderName() { |
049 | return orderName; |
050 | } |
051 |
052 | public void setOrderName(String orderName) { |
053 | this .orderName = orderName; |
054 | } |
055 |
056 | public double getOrderPrice() { |
057 | return orderPrice; |
058 | } |
059 |
060 | public void setOrderPrice( float orderPrice) { |
061 | this .orderPrice = orderPrice; |
062 | } |
063 |
064 | public int getOrderNum() { |
065 | return orderNum; |
066 | } |
067 |
068 | public void setOrderNum( int orderNum) { |
069 | this .orderNum = orderNum; |
070 | } |
071 |
072 | public float getTotalPrice() { |
073 | return totalPrice; |
074 | } |
075 |
076 | public void setTotalPrice( float totalPrice) { |
077 | this .totalPrice = totalPrice; |
078 | } |
079 | |
080 | |
081 | @Override |
082 | public String toString() { |
083 | return "OrderBean [orderId=" + orderId + ", orderUser=" + orderUser + ", orderName=" + orderName |
084 | + ", orderPrice=" + orderPrice + ", orderNum=" + orderNum + ", totalPrice=" + totalPrice + "]" ; |
085 | } |
086 | |
087 |
088 | public void readFields(DataInput in) throws IOException { |
089 | // TODO Auto-generated method stub |
090 | this .orderId = in.readUTF(); |
091 | this .orderUser = in.readUTF(); |
092 | this .orderName = in.readUTF(); |
093 | this .orderPrice = in.readFloat(); |
094 | this .orderNum = in.readInt(); |
095 | this .totalPrice = this .orderPrice * this .orderNum; |
096 | } |
097 |
098 | |
099 | public void write(DataOutput out) throws IOException { |
100 | // TODO Auto-generated method stub |
101 | out.writeUTF(orderId); |
102 | out.writeUTF(orderUser); |
103 | out.writeUTF(orderName); |
104 | out.writeFloat(orderPrice); |
105 | out.writeInt(orderNum); |
106 | } |
107 |
108 | public int compareTo(NewOrderBean o) { |
109 | // TODO Auto-generated method stub |
110 | //return (int) (o.getOrderPrice() - this.orderPrice == 0 ? this.orderName.compareTo(o.getOrderName()) :o.getOrderPrice() - this.orderPrice) ; |
111 | //return Float.compare(o.getTotalPrice(), this.totalPrice) == 0 ? this.orderName.compareTo(o.getOrderName()) : Double.compare(o.getTotalPrice(), this.totalPrice); |
112 | |
113 | return this .orderId.compareTo(o.getOrderId())== 0 ?Float.compare(o.getTotalPrice(), this .getTotalPrice()): this .orderId.compareTo(o.getOrderId()); |
114 | } |
115 | |
116 | |
117 | } |
partition分区
1 | public class OrderPartitioner extends Partitioner<NewOrderBean, NullWritable>{ |
2 | @Override |
3 | public int getPartition(NewOrderBean key, NullWritable value, int numPartitions) { |
4 | // TODO Auto-generated method stub |
5 | // 按照订单中的orderid来分发数据 |
6 | return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions; |
7 | } |
8 |
9 | } |
grouping 分组
01 | public class OrderGrouping extends WritableComparator { |
02 | |
03 | public OrderGrouping() { |
04 | super (NewOrderBean. class , true ); |
05 | } |
06 | |
07 | |
08 | @Override |
09 | public int compare(WritableComparable a, WritableComparable b) { |
10 | // TODO Auto-generated method stub |
11 | |
12 | NewOrderBean o1 = (NewOrderBean)a; |
13 | NewOrderBean o2 = (NewOrderBean)b; |
14 | |
15 | return o1.getOrderId().compareTo(o2.getOrderId()); |
16 | } |
17 | } |
main处理
01 | public class GroupComparator { |
02 | |
03 | public static class GroupMapper extends Mapper<LongWritable, Text, NewOrderBean, NullWritable>{ |
04 | |
05 | NewOrderBean orderBean = new NewOrderBean(); |
06 |
07 | @Override |
08 | protected void map(LongWritable key, Text value, |
09 | Mapper<LongWritable, Text, NewOrderBean, NullWritable>.Context context) |
10 | throws IOException, InterruptedException { |
11 | |
12 | String line = value.toString(); |
13 | String[] fields = line.split( "," ); |
14 | |
15 | orderBean.set(fields[ 0 ], fields[ 1 ], fields[ 2 ], fields[ 3 ], fields[ 4 ]); |
16 | |
17 | /* |
18 | k.set(fields[0]); |
19 | context.write(k, orderBean); |
20 | */ |
21 | // 按照key写了一堆的数据 |
22 | context.write(orderBean, NullWritable.get()); |
23 | } |
24 | |
25 | } |
26 | |
27 | |
28 | public static class GroupReducer extends Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>{ |
29 | @Override |
30 | protected void reduce(NewOrderBean key, Iterable<NullWritable> values, |
31 | Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>.Context context) |
32 | throws IOException, InterruptedException { |
33 | |
34 | //分组输出 |
35 | int topn = context.getConfiguration().getInt("order.top.n",3); |
36 | |
37 | /* |
38 | for(int i=0;i<topn;i++) |
39 | { |
40 | NewOrderBean o = key; |
41 | context.write(o, NullWritable.get()); |
42 | } |
43 | */ |
44 | int i= 0 ; |
45 | for (NullWritable v : values) { |
46 | context.write(key, v); |
47 | if (++i==topn) return ; |
48 | } |
49 | |
50 | |
51 | |
52 | } |
53 | } |
54 | |
55 | |
56 | |
57 | public static void main(String[] args) throws Exception{ |
58 | |
59 | Configuration conf = new Configuration(); |
60 | conf.setInt( "order.top.n" , 3 ); |
61 | Job job = Job.getInstance(conf); |
62 | |
63 | job.setJarByClass(GroupComparator. class ); |
64 | |
65 | job.setMapperClass(GroupMapper. class ); |
66 | //设置partition |
67 | job.setPartitionerClass(OrderPartitioner. class ); |
68 | job.setReducerClass(GroupReducer. class ); |
69 | |
70 | //设置grouping |
71 | job.setGroupingComparatorClass(OrderGrouping. class ); |
72 | |
73 | job.setMapOutputKeyClass(NewOrderBean. class ); |
74 | job.setMapOutputValueClass(NullWritable. class ); |
75 | |
76 | job.setOutputKeyClass(NewOrderBean. class ); |
77 | job.setOutputValueClass(NullWritable. class ); |
78 | |
79 | FileInputFormat.setInputPaths(job, new Path( "e:/mrdata/wordcount/input" )); |
80 | FileOutputFormat.setOutputPath(job, new Path( "e:/mrdata/wordcount/output12" )); |
81 | |
82 | job.setNumReduceTasks( 2 ); |
83 | |
84 | boolean res = job.waitForCompletion( true ); |
85 | System.exit(res? 0 :- 1 ); |
86 | |
87 | } |
88 |
89 | } |
0则评论给“[大数据]MapReducer 排序控制、分区控制、分组控制”