Hadoop倒排索引

package cn.mr.InvertedIndex;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertedIndexMapper extends Mapper {

private static Text keyInfo = new Text();// 存储单词和 URL 组合

private static final Text valueInfo = new Text("1");// 存储词频,初始化为1

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] fields = StringUtils.split(line, " ");// 得到字段数组

FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到这行数据所在的文件切片

String fileName = fileSplit.getPath().getName();// 根据文件切片得到文件名

for (String field : fields) {

// key值由单词和URL组成,如“MapReduce:file1”

keyInfo.set(field + ":" + fileName);

context.write(keyInfo, valueInfo);

}

}

}

package cn.mr.InvertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexCombiner extends Reducer {

private static Text info = new Text();

// 输入:

// 输出:

@Override

protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

int sum = 0;// 统计词频

for (Text value : values) {

sum += Integer.parseInt(value.toString());

}

int splitIndex = key.toString().indexOf(":");

// 重新设置 value 值由 URL 和词频组成

info.set(key.toString().substring(splitIndex + 1) + ":" + sum);

// 重新设置 key 值为单词

key.set(key.toString().substring(0, splitIndex));

context.write(key, info);

}

}

package cn.mr.InvertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexReducer extends Reducer {


private static Text result = new Text();


// 输入:

// 输出:

@Override

protected void reduce(Text key, Iterable values, Context context)

throws IOException, InterruptedException {

// 生成文档列表

String fileList = new String();

for (Text value : values) {

fileList += value.toString() + ";";

}


result.set(fileList);

context.write(key, result);

}

}

package cn.mr.InvertedIndex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndexRunner {

public static void main(String[] args) throws IOException,

ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);


job.setJarByClass(InvertedIndexRunner.class);


job.setMapperClass(InvertedIndexMapper.class);

job.setCombinerClass(InvertedIndexCombiner.class);

job.setReducerClass(InvertedIndexReducer.class);


job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);


FileInputFormat.setInputPaths(job, new Path("D:\InvertedIndex\input"));

// 指定处理完成之后的结果所保存的位置

FileOutputFormat.setOutputPath(job, new Path("D:\InvertedIndex\output"));


// 向 yarn 集群提交这个 job

boolean res = job.waitForCompletion(true);

System.exit(res ? 0 : 1);

}

}

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章