hadoop之mapreduce快速上手
需求描述将每一行文本数据变成<单词,1>这样的k,v数据
将相同单词的一组k,v数据进行聚合:累加所有的v
例如
可以创建多个文件,内容格式如下hello java hello python …
经过分析计算生成结果:hello 2 java 1 python 1前期准备
数据准备:a.txt
hello java
hello python
hello hadoop
hello sparkb.txt c.txt
hello hadoop
hello python
hello python
hello pythonc.txt
hello hadoop
word hadoop
hello spark
hello spark
启动hadoop(我这里namenode和datanode都在localhost上)hadoop-daemon.sh start namenode dataname
启动yarn(也是在localhost启动)start-yarn.sh
在hadoop中创建目录,并将a.txt b.txt c.txt上传hadoop fs -mkdir -p /wordcount/input hadoop fs -put *txt /wordcount/input
处理maptask的类,每读取一行调用一次,主要是将一行:hello java,转化为:(hello,1)(java,1)(hello,1)import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordcountMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }执行reducetask类,处理maptask返回结果,进行聚合处理import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { int count = 0; Iterator iterator = values.iterator(); while(iterator.hasNext()){ IntWritable value = iterator.next(); count += value.get(); } context.write(key, new IntWritable(count)); } }调度程序,将job进行执行在local上(local为hadoop本地虚拟出来的)import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobSubmitterLinuxToYarn { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); # conf.set("mapreduce.framework.name", "yarn"); # 设置job提交到yarn运行,默认local Job job = Job.getInstance(conf); job.setJarByClass(JobSubmitterLinuxToYarn.class); job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); FileOutputFormat.setOutputPath(job, new Path("/wordcount/output")); job.setNumReduceTasks(3); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }在linux上进行编译打包.java
1.1将代码上传至服务器
1.2开始进行编译打包[root@node1 java_jar]# javac *.java -cp $(hadoop classpath)[root@node1 java_jar]# vi MANIFEST.MF #添加 Main-Class: JobSubmitterLinuxToYarn[root@node1 java_jar]# jar -cvfm mr.jar MANIFEST.MF *.class [root@node1 java_jar]# ls
开始执行任务[root@node1 java_jar]# hadoop jar mr.jar JobSubmitterLinuxToYarn控制台输出
查看hadoop目录[root@node1 java_jar]# hadoop fs -ls /wordcount/output [root@node1 java_jar]# hadoop fs -cat /wordcount/output/par*
如果你是在yarn上调度,可以打开 http://node1:8088/cluster
控制台输出