Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In-depth understanding of internal working of map phase in a Map reduce job in hadoop?

I am reading Hadoop: The definitive guide 3rd edtition by Tom White. It is an excellent resource for understanding the internals of Hadoop, especially Map-Reduce which I am interested in.

From the book, (Page 205):

Shuffle and Sort

MapReduce makes the guarantee that the input to every reducer is sorted by key. The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs—is known as the shuffle.

What I infer from this, is that before keys are sent to reducer, they are sorted, indicating that output of map phase of job is sorted. please note: I don't call it mapper, since a map phase include both mapper (written by programmer) and in-built sort mechanism of MR framework.


The Map Side

Each map task has a circular memory buffer that it writes the output to. The buffer is 100 MB by default, a size which can be tuned by changing the io.sort.mb property. When the contents of the buffer reaches a certain threshold size (io.sort.spill.per cent, default 0.80, or 80%), a background thread will start to spill the contents to disk. Map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block until the spill is complete.
Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to. Within each partition, the back- ground thread performs an in-memory sort by key, and if there is a combiner function, it is run on the output of the sort. Running the combiner function makes for a more compact map output, so there is less data to write to local disk and to transfer to the reducer.

My understanding of the above paragraph is that as the mapper is producing key-value pairs, key-value pairs are partitioned and sorted. A hypothetical example:

consider mapper-1 for a word-count program:

>mapper-1 contents
partition-1
   xxxx: 2
   yyyy: 3
partition-2
   aaaa: 15
   zzzz: 11

(Note with-in each partition data is sorted by key, but it is not necessary that partition-1's data and partition-2's data must follow sequential order)


Continuing reading the chapter:

Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record there could be several spill files. Before the task is finished, the spill files are merged into a single partitioned and sorted output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once; the default is 10.

My understanding here is (please know the bold phrase in above para, that tricked me): Within a map-task, several files may be spilled to disk but they are merged to a single file which still contains partition and is sorted. consider the same example as above:

Before a single map-task is finished, its intermediate data could be:

mapper-1 contents

spill 1:             spill 2:           spill 2:
    partition-1         partition-1        partition-1
                          hhhh:5       
       xxxx: 2            xxxx: 3             mmmm: 2
       yyyy: 3            yyyy: 7             yyyy: 9 

    partition-2         partition-2        partition-2
       aaaa: 15           bbbb: 15            cccc: 15
       zzzz: 10           zzzz: 15            zzzz: 13

After the map-task is completed, the output from mapper will be a single file (note three spill files above are added now but no combiner applied assuming no combiner specified in job conf):

>Mapper-1 contents:
partition-1:
hhhh: 5
mmmm: 2
xxxx: 2
xxxx: 3
yyyy: 3
yyyy: 7
yyyy: 9
partition-2:
aaaa: 15
bbbb: 15
cccc: 15
zzzz: 10
zzzz: 15
zzzz: 13

so here partition-1 may correspond to reducer-1. That is data corresponding parition-1 segment above is sent to reducer-1 and data corresponding to partition-2 segment is sent to reducer-2.

If so far, my understanding is correct,

  1. how will I be able to get the intermediate file that has both partitions and sorted data from the mapper output.
  2. It is interesting to note that running mapper alone does not produce sorted output contradicting the points that data send to reducer is not sorted. More details here
  3. Even no combiner is applied if No only Mapper is run: More details here
like image 903
brain storm Avatar asked Jul 23 '14 18:07

brain storm


People also ask

How Hadoop MapReduce Works explain working of MapReduce?

MapReduce facilitates concurrent processing by splitting petabytes of data into smaller chunks, and processing them in parallel on Hadoop commodity servers. In the end, it aggregates all the data from multiple servers to return a consolidated output back to the application.

What happens in map phase and reduce phase of Hadoop MapReduce framework?

In the map job, we split the input dataset into chunks. Map task processes these chunks in parallell. The map we use outputs as inputs for the reduce tasks. Reducers process the intermediate data from the maps into smaller tuples, that reduces the tasks, leading to the final output of the framework.

Which are the functions of the mapping phase and reduce phase in MapReduce?

MapReduce is a software framework and programming model used for processing huge amounts of data. MapReduce program work in two phases, namely, Map and Reduce. Map tasks deal with splitting and mapping of data while Reduce tasks shuffle and reduce the data.

What are the main phases of a Hadoop MapReduce job?

MapReduce program executes in three stages, namely map stage, shuffle stage, and reduce stage.


1 Answers

Map-only jobs work differently than Map-and-Reduce jobs. It's not inconsistent, just different.

how will I be able to get the intermediate file that has both partitions and sorted data from the mapper output.

You can't. There isn't a hook to be able to get pieces of data from intermediate stages of MapReduce. Same is true for getting data after the partitioner, or after a record reader, etc.

It is interesting to note that running mapper alone does not produce sorted output contradicting the points that data send to reducer is not sorted. More details here

It does not contradict. Mappers sort because the reducer needs it sorted to be able to do a merge. If there are no reducers, it has no reason to to sort, so it doesn't. This is the right behavior because I don't want it sorted in a map only job which would make my processing slower. I've never had a situation where I wanted my map output to be locally sorted.

Even no combiner is applied if No only Mapper is run: More details here

Combiners are an optimization. There is no guarantee that they actually run or over what data. Combiners are mostly there to make the reducers more efficient. So, again, just like the local sorting, combiners do not run if there are no reducers because it has no reason to.

If you want combiner-like behavior, I suggest writing data into a buffer (hashmap perhaps) and then writing out locally-summarized data in the cleanup function that runs when a Mapper finishes. Be careful of memory usage if you want to do this. This is a better approach because combiners are specified as a good-to-have optimization and you can't count on them running... even when they do run.

like image 156
Donald Miner Avatar answered Nov 15 '22 08:11

Donald Miner