博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce多个job同时使用的方式(从网上找到的案例,原始博文:http://www.cnblogs.com/yjmyzz/p/4540469.html)...
阅读量:6615 次
发布时间:2019-06-25

本文共 5375 字,大约阅读时间需要 17 分钟。

复杂的MapReduce处理中,往往需要将复杂的处理过程,分解成多个简单的Job来执行,第1个Job的输出做为第2个Job的输入,相互之间有一定依赖关系。以中的求平均数为例,可以分解成三个步骤:

1. 求Sum

2. 求Count

3. 计算平均数

每1个步骤看成一个Job,其中Job3必须等待Job1、Job2完成,并将Job1、Job2的输出结果做为输入,下面的代码演示了如何将这3个Job串起来

代码:

package cn.toto.bigdata.mr.wc;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Avg2 {	private static final Text TEXT_SUM = new Text("SUM");	private static final Text TEXT_COUNT = new Text("COUNT");	private static final Text TEXT_AVG = new Text("AVG");		public static class SumMapper extends Mapper
{ public long sum = 0; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { sum += value.toString().length(); } @Override protected void cleanup(Mapper
.Context context) throws IOException, InterruptedException { context.write(TEXT_SUM, new LongWritable(sum)); } } public static class SumReducer extends Reducer
{ public long sum = 0; @Override protected void reduce(Text key, Iterable
values,Context context) throws IOException, InterruptedException { for (LongWritable v : values) { sum += v.get(); } context.write(TEXT_SUM, new LongWritable(sum)); } } //计算Count public static class CountMapper extends Mapper
{ public long count = 0; @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { count += 1; } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(TEXT_COUNT, new LongWritable(count)); } } public static class CountReducer extends Reducer
{ public long count = 0; @Override public void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { for (LongWritable v : values) { count += v.get(); } context.write(TEXT_COUNT, new LongWritable(count)); } } //计算Avg public static class AvgMapper extends Mapper
{ public long count = 0; public long sum = 0; @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] v = value.toString().split("\t"); if (v[0].equals("COUNT")) { count = Long.parseLong(v[1]); } else if (v[0].equals("SUM")) { sum = Long.parseLong(v[1]); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new LongWritable(sum), new LongWritable(count)); } } public static class AvgReducer extends Reducer
{ public long sum = 0; public long count = 0; @Override protected void reduce(LongWritable key, Iterable
values,Context context) throws IOException, InterruptedException { sum += key.get(); for(LongWritable v : values) { count += v.get(); } } @Override protected void cleanup(Reducer
.Context context) throws IOException, InterruptedException { context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String inputPath = "E:/wordcount/input/a.txt"; String maxOutputPath = "E:/wordcount/output/max/"; String countOutputPath = "E:/wordcount/output/count/"; String avgOutputPath = "E:/wordcount/output/avg/"; Job job1 = Job.getInstance(conf, "Sum"); job1.setJarByClass(Avg2.class); job1.setMapperClass(SumMapper.class); job1.setCombinerClass(SumReducer.class); job1.setReducerClass(SumReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job1, new Path(inputPath)); FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath)); Job job2 = Job.getInstance(conf, "Count"); job2.setJarByClass(Avg2.class); job2.setMapperClass(CountMapper.class); job2.setCombinerClass(CountReducer.class); job2.setReducerClass(CountReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job2, new Path(inputPath)); FileOutputFormat.setOutputPath(job2, new Path(countOutputPath)); Job job3 = Job.getInstance(conf, "Average"); job3.setJarByClass(Avg2.class); job3.setMapperClass(AvgMapper.class); job3.setReducerClass(AvgReducer.class); job3.setMapOutputKeyClass(LongWritable.class); job3.setMapOutputValueClass(LongWritable.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(DoubleWritable.class); //将job1及job2的输出为做job3的输入 FileInputFormat.addInputPath(job3, new Path(maxOutputPath)); FileInputFormat.addInputPath(job3, new Path(countOutputPath)); FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath)); //提交job1及job2,并等待完成 if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) { System.exit(job3.waitForCompletion(true) ? 0 : 1); } }}
运行准备:

准备数据文件:

E:/wordcount/input/a.txt

数据文件的内容如下:

运行后:E:\wordcount\output\count\part-r-00000的值如下:

运行后:

E:\wordcount\output\max\part-r-00000的内容如下:

最终的平均值是:E:\wordcount\output\avg\part-r-00000

转载地址:http://exrso.baihongyu.com/

你可能感兴趣的文章
编译mysql5.6.27
查看>>
搭建centos6.7网站服务器记录
查看>>
Release版本调用ffmpeg av_register_all程序崩溃
查看>>
Referenced management pack not found
查看>>
jquery中data函数的用法示例
查看>>
巧用strtotime函数计算日期
查看>>
JVM中java对象的生命周期
查看>>
mysql 查看连接数,状态
查看>>
JFinal集成YUI Compressor压缩合并JS和CSS
查看>>
windows下的Oracle卸载
查看>>
sqlserver查看死锁的存储过程
查看>>
在VirtualBox中的CentOS 6.3下安装VirtualBox增强包(GuestAd...
查看>>
Java开发中的23种设计模式详解(转)
查看>>
我的友情链接
查看>>
组策略18招
查看>>
关于Android中的数据存储
查看>>
Tomcat配置日志生产功能
查看>>
js的自执行函数
查看>>
移植Qt与Tslib到X210开发板的体会
查看>>
Nginx + webpy 和FastCGI搭建webpy环境
查看>>