分布式计算框架MapReduce_Senior

论坛 期权论坛 期权     
想不到一个好的ID   2019-7-7 21:03   2356   0
  • MapReduce 类型
  • MapReduce 输入格式
  • MapReduce 输出格式
  • MapReduce 中的Combiner Partitioner RecordReader的使用
  • 使用MapReduce完成join操作
  • 使用MapReduce完成排序操作
  • 使用MapReduce完成二次排序操作
  • 使用MapReduce完成小文件合并操作



MapReduce 类型

hadoop的MapReduce中,map函数和reduce函数遵循如下常规格式:

  1. map:(K1,V1)->list(K2,V2)
复制代码
  1. reduce:(K2,list(V2))->list(K3,K4)
复制代码
一般来说,map函数输人的键/值类型(K1和V1)不同于输出类型(K2和V2)。然而,reduce函数的输人类型必须与map函数的输出类型相同,但reduce函数的输出类型(K3和V3)可以不同于输人类型。例如以下Java接口代码:
  1. /**
复制代码
  1. * @param KEYIN    →k1 表示每一行的起始位置(偏移量offset)
复制代码
  1. * @param VALUEIN  →v1 表示每一行的文本内容
复制代码
  1. * @param KEYOUT   →k2 表示每一行中的每个单词
复制代码
  1. * @param VALUEOUT →v2 表示每一行中的每个单词的出现次数,固定值为1
复制代码
  1. */
复制代码
  1. public class Mapper {
复制代码
  1.     protected void map(KEYIN key, VALUEIN value, Mapper.Context context) throws IOException, InterruptedException {
复制代码
  1.         context.write(key, value);
复制代码
  1.     }
复制代码
  1.     public abstract class Context implements MapContext {
复制代码
  1.         public Context() {
复制代码
  1.         }
复制代码
  1.     }
复制代码
  1. }
复制代码
  1. /**
复制代码
  1. * @param KEYIN    →k2 表示每一行中的每个单词
复制代码
  1. * @param VALUEIN  →v2 表示每一行中的每个单词的出现次数,固定值为1
复制代码
  1. * @param KEYOUT   →k3 表示每一行中的每个单词
复制代码
  1. * @param VALUEOUT →v3 表示每一行中的每个单词的出现次数之和
复制代码
  1. */
复制代码
  1. public class Reducer {
复制代码
  1.     protected void reduce(KEYIN key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
复制代码
  1.         Iterator var4 = values.iterator();
复制代码
  1.         while(var4.hasNext()) {
复制代码
  1.             VALUEIN value = var4.next();
复制代码
  1.             context.write(key, value);
复制代码
  1.         }
复制代码
  1.     }
复制代码
  1.     public abstract class Context implements ReduceContext {
复制代码
  1.         public Context() {
复制代码
  1.         }
复制代码
  1.     }
复制代码
  1. }
复制代码
mapreduce 中常用的设置
  • 输入数据类型由输入格式(InputFormat)设置,比如TextInputFormat的key 类型就是LongWritable,value的类型是Text
  • map的输入的key的类型通过setMapOutputKeyClass 设置,Value的类型通过setMapOutputValueClass 设置
  • reduce的输出的key的类型通过setOutputKeyClass设置,Value的类型通过setOutputValue设置

MapReduce 输入格式

[h2]输入格式[/h2]
  • 输入分片与记录
  • 文件输入
  • 文本输入
  • 二进制输入
  • 多文件输入
  • 数据库格式输入
[h3]输入分片与记录[/h3]
  • JobClient通过指定的输入文件的格式来生成数据分片InputSplit。
  • 一个分片不是数据本身,而是可分片数据的引用。
  • InputFormat接口负责生成分片。

InputFormat 负责处理MR的输入部分,有三个作用:
  • 验证作业的输入是否规范。
  • 把输入文件切分成InputSplit。
  • 提供RecordReader 的实现类。把InputSplit读到Mapper中进行处理。





InputFormat接口

InputFormat接口包含了两个抽象方法:getSplits()和creatRecordReader()。InputFormat决定了Hadoop如何对文件进行分片和接收, 它能够从一个 job 中得到一个 split 集合(InputSplit[]),然后再为这个 split 集合配上一个合适的 RecordReader(getRecordReader)来读取每个split中的数据。InputFormat接口的实现细节如下。
  1. public abstract class Inputformat {
复制代码
  1.     public abstract List getSplits(JobContext context);        
复制代码
  1.     public abstract RecordReadercreatRecordReader(InputSplit split, TaskAttemptContext context);
复制代码
  1. }
复制代码
方法说明
getSplits(JobContext context)方法负责将一个大数据逻辑分成许多片,比如数据库表有100条数据,按照主键ID升序存储。假如每20条分成一片,这个list的大小就是5,然后每个InputSplit记录两个参数,第一个参数为这个分片的起始ID,第二参数为这个分片数据的大小,这里是20。很明显InputSplit并没有真正的存储数据,只是提供了一个如何将数据分片的方法。
creatRecordReader(InputSplit split, TaskAttemptContext context)方法根据InputSplit定义的方法,返回一个能够读取分片记录的RecordReader。getSplit用来获取有输入文件计算出来的InputSplit,后面将看到就是那InputSplit时,会考虑输入文件是否可以分割,文件存储时分块的大小和文件大小等因素:而createRecordReader()提供了前面说的RecordReader的实现,将key_value对从InputSplit中正确读出来,比如LineRecordReader,它是以偏移量为key,每行的数据为value,这使所有的数据为value的形式读取输入分片的

小总结:
Input Splits and Records
  • 每个map处理一个输入分片
    1. split
    复制代码
    ,输入分片是一个数据块
  • 每个分片内部包含若干记录
    1. record
    复制代码
    (key-value 对),由map依次处理
  • 都是逻辑概念,用来划分任务
  1. package org.apache.hadoop.mapreduce;
复制代码
  1. public abstract class InputSplit {
复制代码
  1.     //获取分片的大小(字节),用来排序,优先处理最大的分片,以最小化运行时间
复制代码
  1.     public abstract long getLength() throws IOException, InterruptedException;
复制代码
  1.     //存储位置(主机名称),用来调度map任务到分片数据附近
复制代码
  1.     public abstract String[] getLocations() throws IOException, InterruptedException;
复制代码
  1. }
复制代码
  1. InputSplit
复制代码
由实现
  1. InputFormat
复制代码
接口的类创建,该类同时负责并将他们分割成记录
  1. public abstract class InputFormat {
复制代码
  1.       //获取所有的split,从中可以得到map任务的数量
复制代码
  1.       public abstract List getSplits(JobContext context)
复制代码
  1.           throws IOException, InterruptedException;
复制代码
  1.       //获取输入分片内的记录
复制代码
  1.       public abstract RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
复制代码
  1.           throws IOException, InterruptedException;
复制代码
  1. }
复制代码
  1. RecordReader
复制代码
类似于一个迭代器,用来获取split对应的键值对记录record


FileInputformat输入路径设置

FileInputformat为MapReduce中文件输入格式实现类的基类,其作用主要是指定输入文件路径,它提供了四种静态方法来指定输入路径。
  1. public static void addInputPath(Job job, Path path);
复制代码
  1. public static void addInputPaths(Job job, String commaSeparatedPaths);
复制代码
  1. public static void setInputPaths(Job job, Path inputPaths);
复制代码
  1. public static void setInputPaths(Job job, String commaSeparatedPaths);
复制代码
实例如下:
  1. FileInputFormat.addInputPath(job, newPath("Path1")); //设置一个源路径
复制代码
  1. FileInputFormat.addInputPaths(job," Path1, Path2,..."); //设置多个源路径,多个源路径之间用逗号分
复制代码
  1. FileInputFormat.setInputPaths(job,new Path(“path1”), new Path(“path2”),…); //可以包含多个源路径,
复制代码
  1. FileInputFormat.setInputPaths(job,”Path1”,” Path2,..."); //设置多个源路径,多个源路径之间用逗号分开
复制代码
设定过滤器
  1. //设定自定义的路径过滤器
复制代码
  1. public static void setInputPathFilter(Job job,
复制代码
  1.                                         Class[code]List getSplits(JobContext context )
复制代码


FileSplite字段信息
  • file:输入文件的路径
  • start:split起始位置的字节偏移
  • length:split长度
  • hosts:位置
  • hostInfos:记录位置以及标注数据是否位于内存
所以在FileInputFormat的具体子类在读取单个FileSplit时打开的文件是完整的文件,不是限定在FileSplit范围内的文件块,然后
  1. seek
复制代码
到对应的offset开始读取,除第一个split之外,每个split跳过第一条记录(第一条记录可能不完整),然后在到达结尾时确保超出split读取边界(读取下一个Split的第一条记录)


FileInputformat子类

这里我们介绍几种常用的文件类型的输入格式:TextInputformat,KeyValueTextInputformat,NLineInputformat和SequenceFileInputFormat。

TextInputformat

TextInputformat是一种文本输入格式,它也是MapReduce默认的输入格式。TextInputformat将每条记录作为一行输入,其键值是LongWritable 类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符合回车符),它们被打包成一个 Text 对象。

以下是一个示例,比如,一个分片包含了如下4条文本记录。
  1. Rich learning form
复制代码
  1. Intelligent learning engine
复制代码
  1. Learning more convenient
复制代码
  1. From the real demand for more close to the enterprise
复制代码
每条记录表示为以下键/值对:
  1. (0,Rich learning form)
复制代码
  1. (19,Intelligent learning engine)
复制代码
  1. (47,Learning more convenient)
复制代码
  1. (72,From the real demand for more close to the enterprise)
复制代码
很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行切分为分片。

KeyValueTextInputformat

KeyValueTextInputformat为键值对文本输入格式。每一行均为一条记录,被分隔符(缺省是tab(\t))分割为key(Text),value(Text)。

以下是一个示例,输入是一个包含4条记录的分片。其中——>表示一个(水平方向的)制表符,即tab键。

  1. line1 ——>Rich learning form
复制代码
  1. line2 ——>Intelligent learning engine
复制代码
  1. line3 ——>Learning more convenient
复制代码
  1. line4 ——>From the real demand for more close to the enterprise
复制代码
每条记录表示为以下键/值对:

  1. (line1,Rich learning form)
复制代码
  1. (line2,Intelligent learning engine)
复制代码
  1. (line3,Learning more convenient)
复制代码
  1. (line4,From the real demand for more close to the enterprise)
复制代码
此时的键是每行排在制表符之前的Text序列。

NLineInputformat

NLineInputformat为多行输入格式。

分别使用 TextInputFormat 和 KeyValueTextInputFormat输入格式时,每个 Mapper 收到的输入行数是不同的,其中的行数取决于输入分片的大小和行的长度。如果希望 Mapper 收到固定行数的输入,需要使用 NLineInputFormat 输入格式。与 TextInputFormat 一样, 键是文件中行的字节偏移量,值是行本身。N 是每个 Mapper 收到的输入行数。N 设置为1(默认值)时,每个 Mapper 正好收到一行输入。

以下是一个示例,仍然以上面的4行输入为例。
  1. Rich learning form
复制代码
  1. Intelligent learning engine
复制代码
  1. Learning more convenient
复制代码
  1. From the real demand for more close to the enterprise   
复制代码
例如,如果 N 是2,则每个输入分片包含两行。一个 mapper 收到前两行键值对:
  1. (0,Rich learning form)
复制代码
  1. (19,Intelligent learning engine)
复制代码
另一个 mapper 则收到后两行:
  1. (47,Learning more convenient)
复制代码
  1. (72,From the real demand for more close to the enterprise)
复制代码
这里的键和值与 TextInputFormat 生成的一样。

SequenceFileInputFormat

SequenceFileInputFormat为序列化的文件输入格式,用于读取sequence file。序列文件为Hadoop专用的压缩二进制文件格式,它专用于一个MapReduce作业和其他MapReduce作业之间的传送数据,使用于多个MapReduce作业之间的链接操作。

MultipleInputs(多输入格式处理)

虽然一个 MapReduce 作业的输入可能包含多个输入文件,但所有文件都由同一个 InputFormat 和 同一个 Mapper 来解释。然而,数据格式往往会随时间演变,所以必须写自己的 Mapper 来处理应用中的遗留数据格式问题。或者,有些数据源会提供相同的数据, 但是格式不同。

这些问题可以使用 MultipleInputs 类来妥善处理,它允许为每条输入路径指定 InputFormat 和 Mapper。例如,我们想把英国 Met Office 的气象站数据和美国NCDC 的气象站数据放在一起来统计平均气温,则可以按照下面的方式来设置输入路径。
  1. MultipleInputs.addInputPath(job,ncdcInputPath,TextInputFormat.class,NCDCTemperatureMapper.class);
复制代码
  1. MultipleInputs.addInputPath(job,metofficeInputPath,TextInputFormat.class,MetofficeTemperatureMapper.class);
复制代码
这样做目的就是为了,不论有多少个数据源、多少种数据格式,经过Map阶段处理,输出类型一样,Reduce不用关心输入格式。!!!

这段代码取代了对 FileInputFormat.addInputPath()和job.setMapperClass() 的常规调用。Met Office 和 NCDC 的数据都是文本文件,所以对两者都使用 TextInputFormat 数据类型。但这两个数据源的行格式不同,所以我们使用了两个不一样的 Mapper,分别为NCDCTemperatureMapper和MetofficeTemperatureMapper。重要的是两个 Mapper 的输出类型一样,因此,reducer 看到的是聚集后的 map 输出,并不知道这些输入是由不同的 Mapper 产生的。

DBInputFormat

DBInputFormat 这种输入格式用于使用 JDBC 从关系数据库中读取数据。因为它没有任何共享能力,所以在访问数据库的时候必须非常小心,在数据库中运行太多的 mapper 读数据可能会使数据库受不了。正是由于这个原因,DBInputFormat 最好用于加载少量的数据集。与之相对应的输出格式是DBOutputFormat,它适用于将作业输出数据(中等规模的数据)转存到数据库。

自定义Inputformat

有时候 Hadoop 自带的输入格式,并不能完全满足业务的需求,所以需要我们根据实际情况自定义 InputFormat 类。而数据源一般都是文件数据,那么自定义 InputFormat时继承 FileInputFormat 类会更为方便,从而不必考虑如何分片等复杂操作。自定义输入格式我们分为以下几步:
        1)继承 FileInputFormat 基类。
        2)重写 FileInputFormat 里面的 isSplitable() 方法。
        3)重写 FileInputFormat 里面的 createRecordReader()方法。

具体操作:
  • 如果是文本格式的数据,那么实现一个XXInputForamt继承FileInputFormat
  • 重写 FileInputFormat 里面的 isSplitable() 方法。如果文件是压缩文件的话则不能切割,一般都是支持切割
  • 重写 FileInputFormat 里面的 createRecordReader()方法
  • 自定义XXRecordReader,来读取特定的格式
  1. XXRecordReader中需要重点实现以下两个的方法
复制代码
  1.         @Override
复制代码
  1.         public void initialize(InputSplit input, TaskAttemptContext context)
复制代码
  1.                 throws IOException, InterruptedException {
复制代码
  1.             FileSplit split=(FileSplit)input;
复制代码
  1.             Configuration job=context.getConfiguration();
复制代码
  1.             Path file=split.getPath();
复制代码
  1.             FileSystem fs=file.getFileSystem(job);
复制代码
  1.            
复制代码
  1.             FSDataInputStream fileIn=fs.open(file);
复制代码
  1.             //红色标记这部分对于文本型数据来说基本是一样的
复制代码
  1.             in=new LineReader(fileIn,job);
复制代码
  1.             line=new Text();
复制代码
  1.             lineKey=new Text();
复制代码
  1.             lineValue = new Text();
复制代码
  1.         }
复制代码
  1. [/code][code]        //此方法读取每行数据,完成自定义的key和value
复制代码
  1.         @Override
复制代码
  1.         public boolean nextKeyValue() throws IOException, InterruptedException {
复制代码
  1.             int linesize=in.readLine(line);//每行数据
复制代码
  1.             if(linesize==0) return false;
复制代码
  1.             String[] pieces = line.toString().split("\\s+");//解析每行数据
复制代码
  1.             ...
复制代码
  1.             lineKey.set(“key”);//完成自定义key数据
复制代码
  1.             lineValue.set(“value”);//封装自定义value数据
复制代码
  1.             return true;
复制代码
  1.         }
复制代码
  1. [/code]
  2. [b]MapReduce 输出格式[/b]
  3. OutputFormat接口
  4. OutputFormat为输出格式接口,主要用于描述输出数据的格式,它能将输出的键值对写入特定格式的文件中。输出格式的层次结构如下
  5. [img]https://201907.oss-cn-shanghai.aliyuncs.com/wc/1811847-aa6823e90dd92160a256aa8390510e5d[/img]
  6. 2. 文本输出
  7. Hadoop默认的输出格式为文本输出格式TextOutputFormat,其键和值可以使任意类型的,因为该输出方式会调用toString()方法将它们转化为字符串。每个键/值对由制表符进行分割,当然也可以设定 mapreduce.output.textoutputformat.separator 属性改变默认的分隔符。
  8. 3. 二进制输出
  9. 二进制输出有三种方式:SequenceFileOutputFormat,SequenceFileAsBinaryOutputFormat和MapFileOutputFormat。重点掌握第一种。
  10. 对于SequenceFileOutputFormat,顾名思义,SequenceFileOutputFormat 将它的输出写为一个顺序文件。如果输出需要作为后续 MapReduce 任务的输入,这便是一种好的输出格式, 因为它的格式紧凑,并且很容易被压缩。而对于SequenceFileAsBinaryOutputFormat,它将键/值对作为二进制格式写到一个 SequenceFile 容器中。不同的是,MapFileOutputFormat 把 MapFile 作为输出。MapFile 中的键必须顺序添加,所以必须确保 reducer 输出的键已经排好序。
  11. 4. 多个输出
  12. 由于默认情况下只有一个 Reducer,输出只有一个文件。有时可能需要对输出的文件名进行控制或让每个 reducer 输出多个文件。
  13. 当只有一个reduce时,输出文件命名格式为:part-r-00000。当有两个reduce时,输出文件命名格式为:part-r-00000,part-r-00001。当有多个时以此类推。实现Reducer输出多个文件主要有以下两种方式:Partitioner和MultipleOutputs。
  14. 4.1 Partitioner
  15. 我们考虑这样一个需求:按学生的年龄段,将数据输出到不同的文件路径下。这里我们分为三个年龄段:小于等于20岁、大于20岁小于等于50岁和大于50岁。
  16. 我们采用的方法是每个年龄段对应一个 reducer。为此,我们需要通过以下两步实现。
  17. 第一步:把作业的 reducer 数设为年龄段数即为3。
  18. [list][*][*][*][/list][code] job.setPartitionerClass(PCPartitioner.class);//设置Partitioner类
复制代码
  1. [/code][code]  job.setNumReduceTasks(3);// reduce个数设置为3
复制代码
第二步:写一个 Partitioner,把同一个年龄段的数据放到同一个分区。
  1. public static class PCPartitioner extends Partitioner< Text, Text>
复制代码
  1. {
复制代码
  1.                   @Override
复制代码
  1.                   public int getPartition(Text key, Text value, int numReduceTasks) {
复制代码
  1.                           // TODO Auto-generated method stub
复制代码
  1.                           String[] nameAgeScore = value.toString().split("\t");
复制代码
  1.                           String age = nameAgeScore[1];//学生年龄
复制代码
  1.                           int ageInt = Integer.parseInt(age);//按年龄段分区
复制代码
  1.                           // 默认指定分区 0
复制代码
  1.                           if (numReduceTasks == 0)
复制代码
  1.                                    return 0;
复制代码
  1.                           //年龄小于等于20,指定分区0
复制代码
  1.                           if (ageInt  20 && ageInt ");
复制代码
  1.             }
复制代码
  1.             context.write(key, new LongWritable(count));
复制代码
  1.             // 显示次数表示输出的k2,v2的键值对数量
复制代码
  1.             System.out.println("Combiner输出键值对");
复制代码
  1.         };
复制代码
  1.     }
复制代码
  1. [/code]
  2. [b]Partitioner [/b]
  3. [b][/b]
  4. MapReduce的八大步凑,其中在Map阶段总共五个步骤,如下图所示:
  5. [img]https://201907.oss-cn-shanghai.aliyuncs.com/wc/1811847-0a6be0211a52f6b1a4480319f7dc754e[/img]
  6. 其中,step1.3就是一个分区操作。Mapper最终处理的键值对,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer节点中进行归并。[b]哪个key到哪个Reducer的分配过程,是由Partitioner规定的[/b]。在一些集群应用中,例如分布式缓存集群中,缓存的数据大多都是靠哈希函数来进行数据的均匀分布的,在Hadoop中也不例外。
  7. [img]https://201907.oss-cn-shanghai.aliyuncs.com/wc/1811847-b8569881dab1f8e9d5025419f36a2b0c[/img]
  8. [h1][b][/b][/h1][h1][b]Hadoop内置Partitioner[/b][/h1]  MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区,鉴于此,Hadoop中自带了一个默认的分区类HashPartitioner,它继承了Partitioner类,提供了一个getPartition的方法,它的定义如下所示:
  9. [list][*][*][*][*][*][*][*][*][*][/list][code]/** Partition keys by their {@link Object#hashCode()}. */
复制代码
  1. public class HashPartitioner extends Partitioner {
复制代码
  1.   /** Use {@link Object#hashCode()} to partition. */
复制代码
  1.   public int getPartition(K key, V value,
复制代码
  1.                           int numReduceTasks) {
复制代码
  1.     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
复制代码
  1.   }
复制代码
  1. [/code][code]}
复制代码
现在我们来看看HashPartitoner所做的事情,其关键代码就一句:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

这段代码实现的目的是将key均匀分布在Reduce Tasks上,例如:如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int整数。但是,如果string太大的话这个int整数值可能会溢出变成负数,所以和整数的上限值Integer.MAX_VALUE(即0111111111111111)进行与运算,然后再对reduce任务个数取余,这样就可以让key均匀分布在reduce上。


自定义Partitioner大部分情况下,我们都会使用默认的分区函数HashPartitioner。但有时我们又有一些特殊的应用需求,所以我们需要定制Partitioner来完成我们的业务。对日志内容做一个特殊的分区:


从上图中我们可以发现,在第二列上并不是所有的数据都是手机号(例如:84138413并不是一个手机号),我们任务就是在统计手机流量时,将手机号码和非手机号输出到不同的文件中。

[h1]自定义KpiPartitioner[/h1]
  1.      /*
复制代码
  1.      * 自定义Partitioner类
复制代码
  1.      */
复制代码
  1.     public static class KpiPartitioner extends Partitioner {
复制代码
  1.         @Override
复制代码
  1.         public int getPartition(Text key, KpiWritable value, int numPartitions) {
复制代码
  1.             // 实现不同的长度不同的号码分配到不同的reduce task中
复制代码
  1.             int numLength = key.toString().length();
复制代码
  1.             if (numLength == 11) {
复制代码
  1.                 return 0;
复制代码
  1.             } else {
复制代码
  1.                 return 1;
复制代码
  1.             }
复制代码
  1.         }
复制代码
  1.     }
复制代码
这里按手机和非手机号码的区分是按该字段的长度来划分,如果是11位则为手机号。接下来,就是重新修改run方法中的代码:设置为打包运行,设置Partitioner为KpiPartitioner,设置ReducerTask的个数为2
  1.     public int run(String[] args) throws Exception {
复制代码
  1.         // 首先删除输出目录已生成的文件
复制代码
  1.         FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());
复制代码
  1.         Path outPath = new Path(OUTPUT_PATH);
复制代码
  1.         if (fs.exists(outPath)) {
复制代码
  1.             fs.delete(outPath, true);
复制代码
  1.         }
复制代码
  1.         // 定义一个作业
复制代码
  1.         Job job = new Job(getConf(), "MyKpiJob");
复制代码
  1.         // 分区需要设置为打包运行
复制代码
  1.         job.setJarByClass(MyKpiJob.class);
复制代码
  1.         // 设置输入目录
复制代码
  1.         FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
复制代码
  1.         // 设置自定义Mapper类
复制代码
  1.         job.setMapperClass(MyMapper.class);
复制代码
  1.         // 指定的类型
复制代码
  1.         job.setMapOutputKeyClass(Text.class);
复制代码
  1.         job.setMapOutputValueClass(KpiWritable.class);
复制代码
  1.         // 设置Partitioner
复制代码
  1.         job.setPartitionerClass(KpiPartitioner.class);
复制代码
  1.         job.setNumReduceTasks(2);
复制代码
  1.         // 设置Combiner
复制代码
  1.         job.setCombinerClass(MyReducer.class);
复制代码
  1.         // 设置自定义Reducer类
复制代码
  1.         job.setReducerClass(MyReducer.class);
复制代码
  1.         // 指定的类型
复制代码
  1.         job.setOutputKeyClass(Text.class);
复制代码
  1.         job.setOutputKeyClass(KpiWritable.class);
复制代码
  1.         // 设置输出目录
复制代码
  1.         FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
复制代码
  1.         // 提交作业
复制代码
  1.         System.exit(job.waitForCompletion(true) ? 0 : 1);
复制代码
  1.         return 0;
复制代码
  1.     }
复制代码
注意:分区的例子必须要设置为打成jar包运行!
[h1]2.2 打成jar包并在Hadoop中运行[/h1](1)导出jar包
(2)通过FTP上传到Linux中,可以使用各种FTP工具,我一般使用XFtp。
(3)通过Hadoop Shell执行jar包中的程序
  

(4)查看执行结果文件:
  首先是part-r-00000,它展示了手机号码的统计结果

  然后是part-r-00001,它展示了非手机号码的统计结果


(5)通过Web接口验证Partitioner的运行:通过访问http://hadoop-master:50030
  ①是否有2个Reduce任务?




  从图中可以看出,总共有2个Reduce任务;
  ②Reduce输出结果是否一致?


  手机号码有20条记录,一致!


  非手机号码只有1条记录,一致!
总结:分区Partitioner主要作用在于以下两点
(1)根据业务需要,产生多个输出文件;
(2)多个reduce任务并发运行,提高整体job的运行效率
Shuffle

[h1]概述[/h1]
1、MapReduce 中,mapper 阶段处理的数据如何传递给 reducer 阶段,是 MapReduce 框架中 最关键的一个流程,这个流程就叫 Shuffle
2、Shuffle: 数据混洗 ——(核心机制:数据分区,排序,局部聚合,缓存,拉取,再合并 排序)
3、具体来说:就是将 MapTask 输出的处理结果数据,按照 Partitioner 组件制定的规则分发 给 ReduceTask,并在分发的过程中,对数据按 key 进行了分区和排序

[h1]MapReduce的Shuffle过程介绍[/h1]MapReduce中的Shuffle,把一组无规则的数据尽量转换成一组具有一定规则的数据。

为什么MapReduce计算模型需要Shuffle过程?

我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuffle来获取数据。从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和sort过程,如图所示:



[h2]Spill过程[/h2]
Spill过程包括输出、排序、溢写、合并等步骤,如图所示:


Collect
每个Map任务不断地以对的形式把数据输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。
这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长,如图所示:


Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。
索引是对在kvbuffer中的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如Kvindex初始位置是-4,当第一个写完之后,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的长度,然后Kvindex跳到-8位置,等第二个和索引写完之后,Kvindex跳到-32位置。
Kvbuffer的大小虽然可以通过参数设置,但是总共就那么大,和索引不断地增加,加着加着,Kvbuffer总有不够用的那天,那怎么办?把数据从内存刷到磁盘上再接着往内存写数据,把Kvbuffer中的数据刷到磁盘上的过程就叫Spill,多么明了的叫法,内存中的数据满了就自动地spill到具有更大空间的磁盘。
关于Spill触发的条件,也就是Kvbuffer用到什么程度开始Spill,还是要讲究一下的。如果把Kvbuffer用得死死得,一点缝都不剩的时候再开始Spill,那Map任务就需要等Spill完成腾出空间之后才能继续写数据;如果Kvbuffer只是满到一定程度,比如80%的时候就开始Spill,那在Spill的同时,Map任务还能继续写数据,如果Spill够快,Map可能都不需要为空闲空间而发愁。两利相衡取其大,一般选择后者。
Spill这个重要的过程是由Spill线程承担,Spill线程从Map任务接到“命令”之后就开始正式干活,干的活叫SortAndSpill,原来不仅仅是Spill,在Spill之前还有个颇具争议性的Sort。

[h2]Sort[/h2]先把Kvbuffer中的数据按照partition值和key两个关键字升序排序,移动的只是索引数据,排序结果是Kvmeta中数据按照partition为单位聚集在一起,同一partition内的按照key有序。

[h2]Spill[/h2]Spill线程为这次Spill过程创建一个磁盘文件:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out”的文件。Spill线程根据排过序的Kvmeta挨个partition的把数据吐到这个文件中,一个partition对应的数据吐完之后顺序地吐下个partition,直到把所有的partition遍历完。一个partition在文件中对应的数据也叫段(segment)。
所有的partition对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个partition在这个文件中存放的起始位置呢?强大的索引又出场了。有一个三元组记录某个partition对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个partition对应一个三元组。然后把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于“spill12.out.index”的文件,文件中不光存储了索引数据,还存储了crc32的校验数据。(spill12.out.index不一定在磁盘上创建,如果内存(默认1M空间)中能放得下就放在内存中,即使在磁盘上创建了,和spill12.out文件也不一定在同一个目录下。)
每一次Spill过程就会最少生成一个out文件,有时还会生成index文件,Spill的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图所示:


在Spill线程如火如荼的进行SortAndSpill工作的同时,Map任务不会因此而停歇,而是一无既往地进行着数据输出。Map还是把数据写到kvbuffer中,那问题就来了:只顾着闷头按照bufindex指针向上增长,kvmeta只顾着按照Kvindex向下增长,是保持指针起始位置不变继续跑呢,还是另谋它路?如果保持指针起始位置不变,很快bufindex和Kvindex就碰头了,碰头之后再重新开始或者移动内存都比较麻烦,不可取。Map取kvbuffer中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex指针移动到这个分界点,Kvindex移动到这个分界点的-16位置,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当Spill完成,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图所示:


Map任务总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上。

[h2]Merge[/h2]Map任务如果输出数据量很大,可能会进行好几次Spill,out文件和Index文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的merge过程闪亮登场。
Merge过程怎么知道产生的Spill文件都在哪了呢?从所有的本地目录上扫描得到产生的Spill文件,然后把路径存储在一个数组里。Merge过程又怎么知道Spill的索引信息呢?没错,也是从所有的本地目录上扫描得到Index文件,然后把索引信息存储在一个列表里。到这里,又遇到了一个值得纳闷的地方。在之前Spill过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作?特别是Spill的索引数据,之前当内存超限之后就把数据写到磁盘,现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中。之所以多此一举,是因为这时kvbuffer这个内存大户已经不再使用可以回收,有内存空间来装这些数据了。(对于内存空间较大的土豪来说,用内存来省却这两个io步骤还是值得考虑的。)
然后为merge过程创建一个叫file.out的文件和一个叫file.out.Index的文件用来存储最终的输出和索引。
一个partition一个partition的进行合并输出。对于某个partition来说,从索引列表中查询这个partition对应的所有索引信息,每个对应一个段插入到段列表中。也就是这个partition对应一个段列表,记录所有的Spill文件中对应的这个partition那段数据的文件名、起始位置、长度等等。
然后对这个partition对应的所有的segment进行合并,目标是合并成一个segment。当这个partition对应很多个segment时,会分批地进行合并:先从segment列表中把第一批取出来,以key为关键字放置成最小堆,然后从最小堆中每次取出最小的输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到segment列表中;再从segment列表中把第二批取出来合并输出到一个临时segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。
最终的索引数据仍然输出到Index文件中。


Map端的Shuffle过程到此结束。

[h2]Copy[/h2]Reduce任务通过HTTP向各个Map任务拖取它所需要的数据。每个节点都会启动一个常驻的HTTP server,其中一项服务就是响应Reduce拖取Map数据。当有MapOutput的HTTP请求过来的时候,HTTP server就读取相应的Map输出文件中对应这个Reduce部分的数据通过网络流输出给Reduce。
Reduce任务拖取某个Map对应的数据,如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把内存中的数据merge输出到磁盘上一个文件中。
如果在内存中不能放得下这个Map的数据的话,直接把Map数据写到磁盘上,在本地目录创建一个文件,从HTTP流中读取数据然后写到磁盘,使用的缓存区大小是64K。拖一个Map数据过来就会创建一个文件,当文件数量达到一定阈值时,开始启动磁盘文件merge,把这些文件合并输出到一个文件。
有些Map的数据较小是可以放在内存中的,有些Map的数据较大需要放在磁盘上,这样最后Reduce任务拖过来的数据有些放在内存中了有些放在磁盘上,最后会对这些来一个全局合并。

[h2]Merge Sort[/h2]这里使用的Merge和Map端使用的Merge过程一样。Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。
Reduce端的Shuffle过程至此结束。

Shuffle过程小总结

[h1]1 Map端[/h1]

(1)在map端首先接触的是InputSplit,在InputSplit中含有DataNode中的数据,每一个InputSplit都会分配一个Mapper任务,Mapper任务结束后产生的输出,这些输出先存放在缓存中,每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spil l.percent),一个后台线程就把内容写到(spill)Linux本地磁盘中的指定目录(mapred.local.dir)下的新建的一个溢出写文件。
总结:map过程的输出是写入本地磁盘而不是HDFS,但是一开始数据并不是直接写入磁盘而是缓冲在内存中,缓存的好处就是减少磁盘I/O的开销,提高合并和排序的速度。又因为默认的内存缓冲大小是100M(当然这个是可以配置的),所以在编写map函数的时候要尽量减少内存的使用,为shuffle过程预留更多的内存,因为该过程是最耗时的过程。
(2)写磁盘前,要进行partition、sort和combine等操作。通过分区,将不同类型的数据分开处理,之后对不同分区的数据进行排序,如果有Combiner,还要对排序后的数据进行combine。等最后记录写完,将全部溢出文件合并为一个分区且排序的文件。
(3)最后将磁盘中的数据送到Reduce中,从图中可以看出Map输出有三个分区,有一个分区数据被送到图示的Reduce任务中,剩下的两个分区被送到其他Reducer任务中。而图示的Reducer任务的其他的三个输入则来自其他节点的Map输出。
补充:在写磁盘的时候采用压缩的方式将map的输出结果进行压缩是一个减少网络开销很有效的方法!关于如何使用压缩,在本文第三部分会有介绍。
[h1]2 Reduce端[/h1]
(1)Copy阶段:Reducer通过Http方式得到输出文件的分区。
reduce端可能从n个map的结果中获取数据,而这些map的执行速度不尽相同,当其中一个map运行结束时,reduce就会从JobTracker中获取该信息。map运行结束后TaskTracker会得到消息,进而将消息汇报给JobTracker,reduce定时从JobTracker获取该信息,reduce端默认有5个数据复制线程从map端复制数据。
(2)Merge阶段:如果形成多个磁盘文件会进行合并
从map端复制来的数据首先写到reduce端的缓存中,同样缓存占用到达一定阈值后会将数据写到磁盘中,同样会进行partition、combine、排序等过程。如果形成了多个磁盘文件还会进行合并,最后一次合并的结果作为reduce的输入而不是写入到磁盘中。
(3)Reducer的参数:最后将合并后的结果作为输入传入Reduce任务中。
总结:当Reducer的输入文件确定后,整个Shuffle操作才最终结束。之后就是Reducer的执行了,最后Reducer会把结果存到HDFS上。
Hadoop中的压缩
[h1]1 解压缩算法的实现:Codec[/h1]  Codec是Hadoop中关于压缩,解压缩的算法的实现,在Hadoop中,codec由CompressionCode的实现来表示。下面是一些常见压缩算法实现,如下图所示:


[h1]2 MapReduce的输出进行压缩[/h1]  (1)MapReduce的输出属性如下所示


  (2)在Java中如何针对输出设置压缩 ★★★

上图中在reduce端输出压缩使用了刚刚Codec中的Gzip算法,当然你也可以使用bzip2算法;
  • 使用MapReduce完成join操作
  • 使用MapReduce完成排序操作
  • 使用MapReduce完成二次排序操作
  • 使用MapReduce完成小文件合并操作




分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:15
帖子:3
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP