MapReduce在执行map()函数之前,还做了大量的工作,例如数据的切片,将切片生成键值对传给map()函数等等,在执行map()之前做了很多的事情,今天就记录一下输入数据的切片和记录。(tips:由于也是刚刚学,看了权威指南,分享一下心得,有什么问题希望大家指正)
1、简单介绍一下切片,记录以及map()方法之间的联系
一个输入的切片(split)就是一个由单个map操作来处理的输入块。每一个map()操作只处理一个输入分片。每个分片被划分为若干记录,每一个记录就是一个键值对,map()一个接一个的处理记录(后面用一个TextInputFormat输入格式来演示一下)。
2、介绍一下InputSplit接口和InputFormat
InputSplit接口:
输入分片在java中表示一个接口。(idea中使用ctrl+n来全局搜索)
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Public
@Stable
public abstract class InputSplit {
public InputSplit() {
}
public abstract long getLength() throws IOException, InterruptedException;
public abstract String[] getLocations() throws IOException, InterruptedException;
}
InputSplit包含一个以字节为单位的长度和一组存储位置(即一组主机名称)。注意分片不包含数据本身而是指向数据的引用。存储的位置供MapReduce系统使用以便将map任务尽量放在数据分片的旁边,也就是就近原则,避免网络的大量的传输数据造成网络压力过大。
InputFormat:
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Public
@Stable
public abstract class InputFormat<K, V> {
public InputFormat() {
}
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
3、MapReduce中切片生成记录到运行map()方法的流程
运行作业的客户端在通过调用InputFormat中getSplits()方法计算切片,然后将他们发送到application master,application master使用其存储的位置信息来调度map任务从而在集群上处理这些分片数据。map任务就把这个切片的数据传给InputFormat中的createRecordReader()方法来获取这个方法的RecordReader。RecordReader就是记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,然后再传递给map()函数。查看Mapper的run()方法可以看到这些情况:
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}
运行setup()方法后,在重复的调用Context上的nextKeyValue()(委托给RecordReader的同名方法)为mapper产生键值对,通过Context,键值对从RecordReader中被索引出并且传递给map()方法。当reader读到stream的结尾时,nextKeyValue()方法返回false,map运行其cleanup()方法,然后结束。上面你的过程我想用一个流程图画出来,但是画了好几次,都不知道怎么画出来好,以后理解深刻了再更新吧。
4、对文本输入的偏移量的理解
在学习mr的过程中,对输入的偏移量一直都不是很了解,看了书之后,对LongWritable类型有了一点点认识,所以就记录下来。
TextInputFormat是默认的InputFormat。每一条记录是一行输入。键是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何的行终止符(换行符和回车符),它被打包成一个Text对象。所以,包含如下文本的文件被切分为包含四条记录的一个切片:
on the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
每一条记录表示一条键值对:
(0,on the top of the Crumpetty Tree) //到tree的最后一个e是32字节,为此下面key是33
(33,The Quangle Wangle sat,)
(57,But his face you could not see,)
(89,On account of his Beaver Hat.)
贴一下测试中Mapper中华的代码以及运行的结果:
//v表示一行文本
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text keyOut = new Text();
IntWritable valueOut = new IntWritable();
//对文本文件用空格来切数据 保存为字符串的数组
String [] arr = value.toString().split(" ");
//将key打印出来
System.out.println(key+":"+value.toString()+":"+value.getLength());
for (String s : arr){
keyOut.set(s);
valueOut.set(1);
context.write(keyOut,valueOut);
}
}
运行结果:
为什么和上面提到的偏移量不一样?!!
|