数据结构:
NewOrderBean
public class NewOrderBean implements WritableComparable<NewOrderBean>{
private String orderId;
private String orderUser;
private String orderName;
private float orderPrice;
private int orderNum;
private float totalPrice;
public NewOrderBean() {}
public NewOrderBean(String orderId,String orderUser, String orderName, float orderPrice, int orderNum) {
super();
this.orderId = orderId;
this.orderUser = orderUser;
this.orderName = orderName;
this.orderPrice = orderPrice;
this.orderNum = orderNum;
this.totalPrice = orderPrice * orderNum;
}
public void set(String orderId,String orderUser,String orderName,String orderPrice,String orderNum) {
this.orderId = orderId;
this.orderUser = orderUser;
this.orderName = orderName;
this.orderPrice = Float.parseFloat(orderPrice);
this.orderNum = Integer.parseInt(orderNum);
this.totalPrice = this.orderPrice * this.orderNum;
}
public String getOrderUser() {
return orderUser;
}
public void setOrderUser(String orderUser) {
this.orderUser = orderUser;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getOrderName() {
return orderName;
}
public void setOrderName(String orderName) {
this.orderName = orderName;
}
public double getOrderPrice() {
return orderPrice;
}
public void setOrderPrice(float orderPrice) {
this.orderPrice = orderPrice;
}
public int getOrderNum() {
return orderNum;
}
public void setOrderNum(int orderNum) {
this.orderNum = orderNum;
}
public float getTotalPrice() {
return totalPrice;
}
public void setTotalPrice(float totalPrice) {
this.totalPrice = totalPrice;
}
@Override
public String toString() {
return "OrderBean [orderId=" + orderId + ", orderUser=" + orderUser + ", orderName=" + orderName
+ ", orderPrice=" + orderPrice + ", orderNum=" + orderNum + ", totalPrice=" + totalPrice + "]";
}
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.orderId = in.readUTF();
this.orderUser = in.readUTF();
this.orderName = in.readUTF();
this.orderPrice = in.readFloat();
this.orderNum = in.readInt();
this.totalPrice = this.orderPrice * this.orderNum;
}
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(orderId);
out.writeUTF(orderUser);
out.writeUTF(orderName);
out.writeFloat(orderPrice);
out.writeInt(orderNum);
}
public int compareTo(NewOrderBean o) {
// TODO Auto-generated method stub
//return (int) (o.getOrderPrice() - this.orderPrice == 0 ? this.orderName.compareTo(o.getOrderName()) :o.getOrderPrice() - this.orderPrice) ;
//return Float.compare(o.getTotalPrice(), this.totalPrice) == 0 ? this.orderName.compareTo(o.getOrderName()) : Double.compare(o.getTotalPrice(), this.totalPrice);
return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getTotalPrice(), this.getTotalPrice()):this.orderId.compareTo(o.getOrderId());
}
}
partition分区
public class OrderPartitioner extends Partitioner<NewOrderBean, NullWritable>{
@Override
public int getPartition(NewOrderBean key, NullWritable value, int numPartitions) {
// TODO Auto-generated method stub
// 按照订单中的orderid来分发数据
return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
grouping 分组
public class OrderGrouping extends WritableComparator {
public OrderGrouping() {
super(NewOrderBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
NewOrderBean o1 = (NewOrderBean)a;
NewOrderBean o2 = (NewOrderBean)b;
return o1.getOrderId().compareTo(o2.getOrderId());
}
}
main处理
public class GroupComparator {
public static class GroupMapper extends Mapper<LongWritable, Text, NewOrderBean, NullWritable>{
NewOrderBean orderBean = new NewOrderBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, NewOrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
orderBean.set(fields[0], fields[1], fields[2], fields[3], fields[4]);
/*
k.set(fields[0]);
context.write(k, orderBean);
*/
// 按照key写了一堆的数据
context.write(orderBean, NullWritable.get());
}
}
public static class GroupReducer extends Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>{
@Override
protected void reduce(NewOrderBean key, Iterable<NullWritable> values,
Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {
//分组输出
int topn = context.getConfiguration().getInt("order.top.n",3);
/*
for(int i=0;i<topn;i++)
{
NewOrderBean o = key;
context.write(o, NullWritable.get());
}
*/
int i=0;
for (NullWritable v : values) {
context.write(key, v);
if(++i==topn) return;
}
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.setInt("order.top.n", 3);
Job job = Job.getInstance(conf);
job.setJarByClass(GroupComparator.class);
job.setMapperClass(GroupMapper.class);
//设置partition
job.setPartitionerClass(OrderPartitioner.class);
job.setReducerClass(GroupReducer.class);
//设置grouping
job.setGroupingComparatorClass(OrderGrouping.class);
job.setMapOutputKeyClass(NewOrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(NewOrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("e:/mrdata/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("e:/mrdata/wordcount/output12"));
job.setNumReduceTasks(2);
boolean res = job.waitForCompletion(true);
System.exit(res?0:-1);
}
}