MapReduce 编程模型
- MapReduce是什么,适合做什么,不适合做什么
- MapReduce中map和reduce方法的功能
- 开发MapReduce版本的WordCount程序并提交到集群运行
MapReduce是什么
MapReduce是Google的一项重要技术,它首先是一个编程模型,用以进行大数据量的计算。对于大数据量的计算,通常采用的处理手法就是并行计算。但对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,它使得那些没有多有多少并行计算经验的开发人员也可以开发并行应用程序。这也就是MapReduce的价值所在,通过简化编程模型,降低了开发并行应用的入门门槛。
MapReduce设计目标
MapReduce采用的是“分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个子节点共同完成,然后整合各个子节点的中间结果,得到最终的计算结果。
MapReduce的特点
- 易于编程
- 良好的拓展性
- 高容错性
- 能对PB级以上海量数据进行离线处理
MapReduce不擅长的场景
MapReduce编程模型
MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。
[h1]MapReduce框架的组成[/h1]
(1)JobTracker
JobTracker负责调度构成一个作业的所有任务,这些任务分布在不同的TaskTracker上(由上图的JobTracker可以看到2 assign map 和 3 assign reduce)。你可以将其理解为公司的项目经理,项目经理接受项目需求,并划分具体的任务给下面的开发工程师。
(2)TaskTracker
TaskTracker负责执行由JobTracker指派的任务,这里我们就可以将其理解为开发工程师,完成项目经理安排的开发任务即可。
Map阶段由一定数量的MapperTask组成
读数据:读取源数据,maptask获取分片数据信息(类型有:TextInputFormat,文本文件;SequenceFileInputFormat,序列化文件;DBInputFomrat,数据库文件), 形成key-value数据;
逻辑处理:通过循环调用Mapper类的map方法读取每行数据进行处理;
分区:通过Partitioner类的getPartition()方法对数据进行分区(默认执行HashPartitioner,分发规则:(key的hashcode值&Integer.MAX_VALUE)%numReducetTasks),分区规则注明分区号相同的数据会被分发给同一reducetask(只要按照规则就会返回相同的分区号);
排序:将数据通过key的compareTo()方法比较排序(默认是普通的字典排序);
Reduce 阶段由一定数量的Reducer Task 组成
读数据:reducetask会通过http方式下载各自处理的“区”的数据到本地磁盘,并合并排序,执行默认的GroupingComparator确定数据key相同的为同一组(我们在自定义的时候写一个类A继承WritableComparator,根据需求重写compare()方法,因为要从磁盘上读取数据,那么需要反序列化,需要在A的构造函数中告知WritableComparator反序列化的类型,否则会出错);
处理数据:reducetask把相同key的数据值聚合到Reducer类,按照reduce()方法处理逻辑,输出数据(输出类型:TextOutputFomat,文件类型;SequenceFileOutputFomrat,序列化文件;DBOutputFomrat,数据库数据文件);
[h1]MapReduce工作机制[/h1]
MapReduce的整个工作过程如上图所示,它包含如下4个独立的实体:
实体一:客户端,用来提交MapReduce作业。
实体二:JobTracker,用来协调作业的运行。
实体三:TaskTracker,用来处理作业划分后的任务。
实体四:HDFS,用来在其它实体间共享作业文件。
通过审阅MapReduce的工作流程图,可以看出MapReduce整个工作过程有序地包含如下工作环节:
在Hadoop中,一个MapReduce作业通常会把输入的数据集切分为若干独立的数据块,由Map任务以完全并行的方式去处理它们。框架会对Map的输出先进行排序,然后把结果输入给Reduce任务。通常作业的输入和输出都会被存储在文件系统中,整个框架负责任务的调度和监控,以及重新执行已经关闭的任务。
通常,MapReduce框架和分布式文件系统是运行在一组相同的节点上,也就是说,计算节点和存储节点通常都是在一起的。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使得整个集群的网络带宽被非常高效地利用。
[h1]MapReduce的输入输出[/h1]MapReduce框架运转在键值对上,也就是说,框架把作业的输入看成是一组键值对,同样也产生一组键值对作为作业的输出,这两组键值对有可能是不同的。
一个MapReduce作业的输入和输出类型如下图所示:可以看出在整个流程中,会有三组键值对类型的存在。
[h1][/h1][h1][/h1][h1]MapReduce的处理流程[/h1]这里以WordCount单词计数为例,介绍map和reduce两个阶段需要进行哪些处理。单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如图所示:
(1)map任务处理
(2)reduce任务处理
WordCount demo
- [/code][code]import org.apache.hadoop.conf.Configuration;
复制代码- import org.apache.hadoop.fs.FileSystem;
复制代码- import org.apache.hadoop.fs.Path;
复制代码- import org.apache.hadoop.io.IntWritable;
复制代码- import org.apache.hadoop.io.Text;
复制代码- import org.apache.hadoop.mapreduce.Job;
复制代码- import org.apache.hadoop.mapreduce.Mapper;
复制代码- import org.apache.hadoop.mapreduce.Reducer;
复制代码- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
复制代码- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
复制代码- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
复制代码- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
复制代码- [/code][code]import java.io.IOException;
复制代码- import java.net.URISyntaxException;
复制代码- public class WrodCountApp {
复制代码- [/code][code] public static class MyMapper extends Mapper {
复制代码- private final static IntWritable one = new IntWritable(1);
复制代码- private Text word = new Text();
复制代码- [/code][code] public void map(IntWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
复制代码- String[] splited = value.toString().split(" ");
复制代码- for (String word : splited) {
复制代码- context.write(new Text(word), new IntWritable(1));
复制代码- [/code][code] public static class MyReducer extends Reducer {
复制代码- [/code][code] private IntWritable result = new IntWritable();
复制代码- [/code][code] public void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
复制代码- for (IntWritable val : values) {
复制代码- context.write(key, new IntWritable(sum));
复制代码- [/code][code] public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {
复制代码- String INPUT_PATH = "hdfs://hadoop:8020/wc";
复制代码- String OUT_PATH = "hdfs://hadoop:8020/outputwc";
复制代码- [/code][code] Configuration configuration = new Configuration();
复制代码- final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
复制代码- [/code][code] if (fileSystem.exists(new Path(OUT_PATH))) {
复制代码- fileSystem.delete(new Path(OUT_PATH), true);
复制代码- Job job = Job.getInstance(configuration, "WordCountApp");
复制代码- [/code][code] // run jar class
复制代码- job.setJarByClass(WrodCountApp.class);
复制代码- job.setMapperClass(MyMapper.class);
复制代码- job.setMapOutputKeyClass(Text.class);
复制代码- job.setOutputValueClass(IntWritable.class);
复制代码- [/code][code] // 设置reduce
复制代码- job.setReducerClass(MyReducer.class);
复制代码- job.setOutputKeyClass(Text.class);
复制代码- job.setOutputValueClass(IntWritable.class);
复制代码- [/code][code] // 设置input format
复制代码- job.setInputFormatClass(TextInputFormat.class);
复制代码- Path inputPath = new Path(INPUT_PATH);
复制代码- FileInputFormat.setInputPaths(job, inputPath);
复制代码- [/code][code] // 设置output format
复制代码- job.setOutputFormatClass(TextOutputFormat.class);
复制代码- Path outPath = new Path(OUT_PATH);
复制代码- FileOutputFormat.setOutputPath(job, outPath);
复制代码- System.exit(job.waitForCompletion(true) ? 0 : 1);
复制代码- [/code]
- [list][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][/list][code] /**
复制代码- * @param KEYIN →k1 表示每一行的起始位置(偏移量offset)
复制代码- * @param VALUEIN →v1 表示每一行的文本内容
复制代码- * @param KEYOUT →k2 表示每一行中的每个单词
复制代码- * @param VALUEOUT →v2 表示每一行中的每个单词的出现次数,固定值为1
复制代码- public static class MyMapper extends Mapper {
复制代码- private final static IntWritable one = new IntWritable(1);
复制代码- private Text word = new Text();
复制代码- [/code][code] public void map(IntWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
复制代码- String[] splited = value.toString().split(" ");
复制代码- for (String word : splited) {
复制代码- context.write(new Text(word), new IntWritable(1));
复制代码 Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型;从代码中可以看出,在Mapper类和Reducer类中都使用了Hadoop自带的基本数据类型,例如String对应Text,long对应LongWritable,int对应IntWritable。这是因为HDFS涉及到序列化的问题,Hadoop的基本数据类型都实现了一个Writable接口,而实现了这个接口的类型都支持序列化。 这里的map函数中通过空格符号来分割文本内容,并对其进行记录;
- * @param KEYIN →k2 表示每一行中的每个单词
复制代码- * @param VALUEIN →v2 表示每一行中的每个单词的出现次数,固定值为1
复制代码- * @param KEYOUT →k3 表示每一行中的每个单词
复制代码- * @param VALUEOUT →v3 表示每一行中的每个单词的出现次数之和
复制代码- public static class MyReducer extends Reducer {
复制代码- [/code][code] private IntWritable result = new IntWritable();
复制代码- [/code][code] public void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
复制代码- for (IntWritable val : values) {
复制代码- context.write(key, new IntWritable(sum));
复制代码 Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型(这里输入的key、value类型通常和map的输出key、value类型保持一致)和输出的key、value 类型。
这里的reduce函数主要是将传入的进行最后的合并统计,形成最后的统计结果。
- public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {
复制代码- String INPUT_PATH = "hdfs://hadoop:8020/wc";
复制代码- String OUT_PATH = "hdfs://hadoop:8020/outputwc";
复制代码- [/code][code] Configuration configuration = new Configuration();
复制代码- final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
复制代码- [/code][code] if (fileSystem.exists(new Path(OUT_PATH))) {
复制代码- fileSystem.delete(new Path(OUT_PATH), true);
复制代码- Job job = Job.getInstance(configuration, "WordCountApp");
复制代码- [/code][code] // run jar class
复制代码- job.setJarByClass(WrodCountApp.class);
复制代码- job.setMapperClass(MyMapper.class);
复制代码- job.setMapOutputKeyClass(Text.class);
复制代码- job.setOutputValueClass(IntWritable.class);
复制代码- [/code][code] // 设置reduce
复制代码- job.setReducerClass(MyReducer.class);
复制代码- job.setOutputKeyClass(Text.class);
复制代码- job.setOutputValueClass(IntWritable.class);
复制代码- [/code][code] // 设置input format
复制代码- job.setInputFormatClass(TextInputFormat.class);
复制代码- Path inputPath = new Path(INPUT_PATH);
复制代码- FileInputFormat.setInputPaths(job, inputPath);
复制代码- [/code][code] // 设置output format
复制代码- job.setOutputFormatClass(TextOutputFormat.class);
复制代码- Path outPath = new Path(OUT_PATH);
复制代码- FileOutputFormat.setOutputPath(job, outPath);
复制代码- System.exit(job.waitForCompletion(true) ? 0 : 1);
复制代码- [/code]在Main函数中,主要做了三件事:一是指定输入、输出目录;二是指定自定义的Mapper类和Reducer类;三是提交作业;
- [list][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][*][/list][code]public static void main(String[] args) throws Exception {
复制代码- Configuration conf = new Configuration();
复制代码- [/code][code] // 0.0:首先删除输出路径的已有生成文件
复制代码- FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);
复制代码- Path outPath = new Path(OUTPUT_PATH);
复制代码- if (fs.exists(outPath)) {
复制代码- fs.delete(outPath, true);
复制代码- [/code][code] Job job = new Job(conf, "WordCount");
复制代码- job.setJarByClass(MyWordCountJob.class);
复制代码- [/code][code] // 1.0:指定输入目录
复制代码- FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
复制代码- // 1.1:指定对输入数据进行格式化处理的类(可以省略)
复制代码- job.setInputFormatClass(TextInputFormat.class);
复制代码- job.setMapperClass(MyMapper.class);
复制代码- // 1.3:指定map输出的类型(如果的类型与的类型一致则可以省略)
复制代码- job.setMapOutputKeyClass(Text.class);
复制代码- job.setMapOutputValueClass(LongWritable.class);
复制代码- job.setPartitionerClass(HashPartitioner.class);
复制代码- // 1.5:设置要运行的Reducer的数量(可以省略)
复制代码- job.setNumReduceTasks(1);
复制代码- job.setReducerClass(MyReducer.class);
复制代码- job.setOutputKeyClass(Text.class);
复制代码- job.setOutputValueClass(LongWritable.class);
复制代码- FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
复制代码- // 1.9:指定对输出数据进行格式化处理的类(可以省略)
复制代码- job.setOutputFormatClass(TextOutputFormat.class);
复制代码- boolean success = job.waitForCompletion(true);
复制代码- System.out.println("Success");
复制代码- System.out.println("Failed");
复制代码 |
|