hadoop数据排序

package cn.mr.topn;

import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class TopNMapper extends Mapper {

private TreeMap repToRecordMap = new TreeMap();

// <0,10 3 8 7 6 5 1 2 9 4>

//

@Override

public void map(LongWritable key, Text value, Context context) {

String line = value.toString();

String[] nums = line.split(" ");

for (String num : nums) {

repToRecordMap.put(Integer.parseInt(num), " ");

if (repToRecordMap.size() > 5) {

repToRecordMap.remove(repToRecordMap.firstKey());

}

}

}

@Override

protected void cleanup(Context context) {

for (Integer i : repToRecordMap.keySet()) {

try {

context.write(NullWritable.get(), new IntWritable(i));

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

package cn.mr.topn;

import java.io.IOException;

import java.util.Comparator;

import java.util.TreeMap;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer {


private TreeMap repToRecordMap = new TreeMap(new Comparator() {


//返回一个基本类型的整型,谁大谁排后面.

//返回负数表示:o1 小于o2

//返回0表示:表示:o1和o2相等

//返回正数表示:o1大于o2。

public int compare(Integer a, Integer b) {

return b - a;

}

});

public void reduce(NullWritable key, Iterable values, Context context)

throws IOException, InterruptedException {

for (IntWritable value : values) {

repToRecordMap.put(value.get(), " ");

if (repToRecordMap.size() > 5) {

repToRecordMap.remove(repToRecordMap.firstKey());

}

}

for (Integer i : repToRecordMap.keySet()) {

context.write(NullWritable.get(), new IntWritable(i));

}

}

}

package cn.mr.topn;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TopNRunner {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(TopNRunner.class);

job.setMapperClass(TopNMapper.class);

job.setReducerClass(TopNReducer.class);

job.setNumReduceTasks(1);

job.setMapOutputKeyClass(NullWritable.class);// map阶段的输出的key

job.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value

job.setOutputKeyClass(NullWritable.class);// reduce阶段的输出的key

job.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value

FileInputFormat.setInputPaths(job, new Path("D:\topN\input"));

FileOutputFormat.setOutputPath(job, new Path("D:\topN\output"));

boolean res = job.waitForCompletion(true);

System.exit(res ? 0 : 1);

}

}

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章