package cn.mr.topn;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TopNMapper extends Mapper
{ private TreeMap
repToRecordMap = new TreeMap (); // <0,10 3 8 7 6 5 1 2 9 4>
//
@Override
public void map(LongWritable key, Text value, Context context) {
String line = value.toString();
String[] nums = line.split(" ");
for (String num : nums) {
repToRecordMap.put(Integer.parseInt(num), " ");
if (repToRecordMap.size() > 5) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
}
@Override
protected void cleanup(Context context) {
for (Integer i : repToRecordMap.keySet()) {
try {
context.write(NullWritable.get(), new IntWritable(i));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
package cn.mr.topn;
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class TopNReducer extends Reducer
{
private TreeMap
repToRecordMap = new TreeMap (new Comparator () {
//返回一个基本类型的整型,谁大谁排后面.
//返回负数表示:o1 小于o2
//返回0表示:表示:o1和o2相等
//返回正数表示:o1大于o2。
public int compare(Integer a, Integer b) {
return b - a;
}
});
public void reduce(NullWritable key, Iterable
values, Context context) throws IOException, InterruptedException {
for (IntWritable value : values) {
repToRecordMap.put(value.get(), " ");
if (repToRecordMap.size() > 5) {
repToRecordMap.remove(repToRecordMap.firstKey());
}
}
for (Integer i : repToRecordMap.keySet()) {
context.write(NullWritable.get(), new IntWritable(i));
}
}
}
package cn.mr.topn;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TopNRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(TopNRunner.class);
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);
job.setNumReduceTasks(1);
job.setMapOutputKeyClass(NullWritable.class);// map阶段的输出的key
job.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value
job.setOutputKeyClass(NullWritable.class);// reduce阶段的输出的key
job.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value
FileInputFormat.setInputPaths(job, new Path("D:\topN\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\topN\output"));
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
留言与评论(共有 0 条评论) “” |