本文共 4417 字,大约阅读时间需要 14 分钟。
package com.kgf.mapreduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/*** * * 继承的Mapper参数如下: *第一个参数key:LongWritable表示输入的key的行号 *第二个参数value:Text表示一行内容 *第三个参数key: Text表示单词 *第四个参数value:IntWritable表示计算后的单词的个数 * @author kgf * */public class WordCountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); /** * 使用map方法去处理数据,数据是一行一行进入到这个方法处理的 * key:表示行号 * value:表示一行数据内容 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //首先我们将一行内容转换成String String line = value.toString(); //数据的单词之间是以空格切割的 String[] words = line.split(" "); //将数据循环写出到下一阶段 for (String word : words) { k.set(word); context.write(k, v); } }}
⑶创建自定义的Reducer类对分类的数据进行汇总
package com.kgf.mapreduce;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * 注意:这里继承Reducer的前两个入参就是Mappper的出参数 * @author kgf * */public class WordCountReducer extends Reducer{ /** * 这个方法主要是对map分类之后的数据进行聚合的 */ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //统计单词个数 int sum = 0; for (IntWritable count : values) { sum+=count.get(); } //输出单词总个数 context.write(key, new IntWritable(sum)); } }
⑷创建Driver提交任务
package com.kgf.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1:首先获取job信息 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //2:获取jar包位置,指定入口类,hadoop会自己找到 job.setJarByClass(WordCountDriver.class); //3:关联自定义的mapper和reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4:设置map输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5:设置reducer输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6:设置数据输入和输出文件路径,这里我们通过main方法获取参数路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //7:提交代码 boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
转载地址:http://hfql.baihongyu.com/