流沙团
[大数据]MapReducer 排序控制、分区控制、分组控制
2019-3-18 流沙团


数据结构:



NewOrderBean




public class NewOrderBean implements WritableComparable<NewOrderBean>{
private String orderId;
private String orderUser;
private String orderName;
private float orderPrice;
private int orderNum;
private float totalPrice;

public NewOrderBean() {}

public NewOrderBean(String orderId,String orderUser, String orderName, float orderPrice, int orderNum) {
super();
this.orderId = orderId;
this.orderUser = orderUser;
this.orderName = orderName;
this.orderPrice = orderPrice;
this.orderNum = orderNum;
this.totalPrice = orderPrice * orderNum;
}

public void set(String orderId,String orderUser,String orderName,String orderPrice,String orderNum) {
this.orderId = orderId;
this.orderUser = orderUser;
this.orderName = orderName;
this.orderPrice = Float.parseFloat(orderPrice);
this.orderNum = Integer.parseInt(orderNum);
this.totalPrice = this.orderPrice * this.orderNum;

}


public String getOrderUser() {
return orderUser;
}

public void setOrderUser(String orderUser) {
this.orderUser = orderUser;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public String getOrderName() {
return orderName;
}

public void setOrderName(String orderName) {
this.orderName = orderName;
}

public double getOrderPrice() {
return orderPrice;
}

public void setOrderPrice(float orderPrice) {
this.orderPrice = orderPrice;
}

public int getOrderNum() {
return orderNum;
}

public void setOrderNum(int orderNum) {
this.orderNum = orderNum;
}

public float getTotalPrice() {
return totalPrice;
}

public void setTotalPrice(float totalPrice) {
this.totalPrice = totalPrice;
}


@Override
public String toString() {
return "OrderBean [orderId=" + orderId + ", orderUser=" + orderUser + ", orderName=" + orderName
+ ", orderPrice=" + orderPrice + ", orderNum=" + orderNum + ", totalPrice=" + totalPrice + "]";
}


public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.orderId = in.readUTF();
this.orderUser = in.readUTF();
this.orderName = in.readUTF();
this.orderPrice = in.readFloat();
this.orderNum = in.readInt();
this.totalPrice = this.orderPrice * this.orderNum;
}


public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(orderId);
out.writeUTF(orderUser);
out.writeUTF(orderName);
out.writeFloat(orderPrice);
out.writeInt(orderNum);
}

public int compareTo(NewOrderBean o) {
// TODO Auto-generated method stub
//return (int) (o.getOrderPrice() - this.orderPrice == 0 ? this.orderName.compareTo(o.getOrderName()) :o.getOrderPrice() - this.orderPrice) ;
//return Float.compare(o.getTotalPrice(), this.totalPrice) == 0 ? this.orderName.compareTo(o.getOrderName()) : Double.compare(o.getTotalPrice(), this.totalPrice);

return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getTotalPrice(), this.getTotalPrice()):this.orderId.compareTo(o.getOrderId());
}


}







partition分区



public class OrderPartitioner extends Partitioner<NewOrderBean, NullWritable>{
@Override
public int getPartition(NewOrderBean key, NullWritable value, int numPartitions) {
// TODO Auto-generated method stub
// 按照订单中的orderid来分发数据
return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}

}







grouping 分组




public class OrderGrouping extends WritableComparator {

public OrderGrouping() {
super(NewOrderBean.class,true);
}


@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub

NewOrderBean o1 = (NewOrderBean)a;
NewOrderBean o2 = (NewOrderBean)b;

return o1.getOrderId().compareTo(o2.getOrderId());
}
}







main处理




public class GroupComparator {

public static class GroupMapper extends Mapper<LongWritable, Text, NewOrderBean, NullWritable>{

NewOrderBean orderBean = new NewOrderBean();

@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, NewOrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {

String line = value.toString();
String[] fields = line.split(",");

orderBean.set(fields[0], fields[1], fields[2], fields[3], fields[4]);

/*
k.set(fields[0]);
context.write(k, orderBean);
*/
// 按照key写了一堆的数据
context.write(orderBean, NullWritable.get());
}

}


public static class GroupReducer extends Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>{
@Override
protected void reduce(NewOrderBean key, Iterable<NullWritable> values,
Reducer<NewOrderBean, NullWritable, NewOrderBean, NullWritable>.Context context)
throws IOException, InterruptedException {

//分组输出
int topn = context.getConfiguration().getInt("order.top.n",3);

/*
for(int i=0;i<topn;i++)
{
NewOrderBean o = key;
context.write(o, NullWritable.get());
}
*/
int i=0;
for (NullWritable v : values) {
context.write(key, v);
if(++i==topn) return;
}



}
}



public static void main(String[] args) throws Exception{

Configuration conf = new Configuration();
conf.setInt("order.top.n", 3);
Job job = Job.getInstance(conf);

job.setJarByClass(GroupComparator.class);

job.setMapperClass(GroupMapper.class);
//设置partition
job.setPartitionerClass(OrderPartitioner.class);
job.setReducerClass(GroupReducer.class);

//设置grouping
job.setGroupingComparatorClass(OrderGrouping.class);

job.setMapOutputKeyClass(NewOrderBean.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(NewOrderBean.class);
job.setOutputValueClass(NullWritable.class);

FileInputFormat.setInputPaths(job, new Path("e:/mrdata/wordcount/input"));
FileOutputFormat.setOutputPath(job, new Path("e:/mrdata/wordcount/output12"));

job.setNumReduceTasks(2);

boolean res = job.waitForCompletion(true);
System.exit(res?0:-1);

}

}



发表评论:
昵称

邮件地址 (选填)

个人主页 (选填)

内容