When Hadoop submits a job, it splits the input data logically (Input splits) and these are processed by each Mapper. The number of Mappers is equal to the number of input splits created. InputFormat. getSplits() is responsible for generating the input splits which uses each split as input for each mapper job.
How is the splitting of file invoked in Apache Hadoop? An Input File for processing is stored on local HDFS store. The InputFormat component of MapReduce task divides this file into Splits. These splits are called InputSplits in Hadoop MapReduce.
In HDFS, files are divided into blocks and distributed across the cluster. The blocks are replicated to handle hardware failure, and checksums are added for each block for corruption detection and recovery.
What is InputSplit in Hadoop? InputSplit in Hadoop MapReduce is the logical representation of data. It describes a unit of work that contains a single map task in a MapReduce program. Hadoop InputSplit represents the data which is processed by an individual Mapper.
Interesting question, I spent some time looking at the code for the details and here are my thoughts. The splits are handled by the client by InputFormat.getSplits
, so a look at FileInputFormat gives the following info:
max(minSize, min(maxSize, blockSize))
where maxSize
corresponds to mapred.max.split.size
and minSize
is mapred.min.split.size
.Divide the file into different FileSplit
s based on the split size calculated above. What's important here is that each FileSplit
is initialized with a start
parameter corresponding to the offset in the input file. There is still no handling of the lines at that point. The relevant part of the code looks like this:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
After that, if you look at the LineRecordReader
which is defined by the TextInputFormat
, that's where the lines are handled:
LineRecordReader
it tries to instantiate a LineReader
which is an abstraction to be able to read lines over FSDataInputStream
. There are 2 cases:CompressionCodec
defined, then this codec is responsible for handling boundaries. Probably not relevant to your question.If there is no codec however, that's where things are interesting: if the start
of your InputSplit
is different than 0, then you backtrack 1 character and then skip the first line you encounter identified by \n or \r\n (Windows) ! The backtrack is important because in case your line boundaries are the same as split boundaries, this ensures you do not skip the valid line. Here is the relevant code:
if (codec != null) {
in = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
start += in.readLine(new Text(), 0,
(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
So since the splits are calculated in the client, the mappers don't need to run in sequence, every mapper already knows if it neds to discard the first line or not.
So basically if you have 2 lines of each 100Mb in the same file, and to simplify let's say the split size is 64Mb. Then when the input splits are calculated, we will have the following scenario:
Map Reduce algorithm does not work on physical blocks of the file. It works on logical input splits. Input split depends on where the record was written. A record may span two Mappers.
The way HDFS has been set up, it breaks down very large files into large blocks (for example, measuring 128MB), and stores three copies of these blocks on different nodes in the cluster.
HDFS has no awareness of the content of these files. A record may have been started in Block-a but end of that record may be present in Block-b.
To solve this problem, Hadoop uses a logical representation of the data stored in file blocks, known as input splits. When a MapReduce job client calculates the input splits, it figures out where the first whole record in a block begins and where the last record in the block ends.
The key point :
In cases where the last record in a block is incomplete, the input split includes location information for the next block and the byte offset of the data needed to complete the record.
Have a look at below diagram.
Have a look at this article and related SE question : About Hadoop/HDFS file splitting
More details can be read from documentation
The Map-Reduce framework relies on the InputFormat of the job to:
InputSplit[] getSplits(JobConf job,int numSplits
) is the API to take care of these things.FileInputFormat, which extends InputFormat
implemented getSplits
() method. Have a look at internals of this method at grepcode
I see it as following: InputFormat is responsible to split data into logical splits taking into account the nature of the data.
Nothing prevents it to do so, although it can add significant latency to the job - all the logic and reading around the desired split size boundaries will happen in the jobtracker.
Simplest record aware input format is TextInputFormat. It is working as following (as far as I understood from code) - input format create splits by size, regardless of the lines, but LineRecordReader always :
a) Skip first line in the split (or part of it), if it is not the first split
b) Read one line after the boundary of the split in the end (if data it is available, so it is not the last split).
From what I've understood, when the FileSplit
is initialized for the first block, the default constructor is called. Therefore the values for start and length are zero initially. By the end of processing of the fist block, the if the last line is incomplete, then the value of length will be greater than the length of the split and it'll read the first line of next block as well. Due to this the value of start for the first block will be greater than zero and under this condition, the LineRecordReader
will skip the fist line of the second block. (See source)
In case the last line of the first block is complete, then the value of length will be equal to the length of the first block and the value of the start for the second block will be zero. In that case the LineRecordReader
will not skip the first line and read the second block form the beginning.
Makes sense?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With