Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Hadoop Map-Reduce, does any class see the whole list of keys after sorting and before partitioning?

I am using Hadoop to analyze a very uneven distribution of data. Some keys have thousands of values, but most have only one. For example, network traffic associated with IP addresses would have many packets associated with a few talkative IPs and just a few with most IPs. Another way of saying this is that the Gini index is very high.

To process this efficiently, each reducer should either get a few high-volume keys or a lot of low-volume keys, in such a way as to get a roughly even load. I know how I would do this if I were writing the partition process: I would take the sorted list of keys (including all duplicate keys) that was produced by the mappers as well as the number of reducers N and put splits at

split[i] = keys[floor(i*len(keys)/N)]

Reducer i would get keys k such that split[i] <= k < split[i+1] for 0 <= i < N-1 and split[i] <= k for i == N-1.

I'm willing to write my own partitioner in Java, but the Partitioner<KEY,VALUE> class only seems to have access to one key-value record at a time, not the whole list. I know that Hadoop sorts the records that were produced by the mappers, so this list must exist somewhere. It might be distributed among several partitioner nodes, in which case I would do the splitting procedure on one of the sublists and somehow communicate the result to all other partitioner nodes. (Assuming that the chosen partitioner node sees a randomized subset, the result would still be approximately load-balanced.) Does anyone know where the sorted list of keys is stored, and how to access it?

I don't want to write two map-reduce jobs, one to find the splits and another to actually use them, because that seems wasteful. (The mappers would have to do the same job twice.) This seems like a general problem: uneven distributions are pretty common.

like image 959
Jim Pivarski Avatar asked Aug 24 '12 21:08

Jim Pivarski


2 Answers

I've been thinking about this problem, too. This is the high-level approach I would take if someone forced me.

  • In addition to the mapper logic you have in place to solve your business problem, code some logic to gather whatever statistics you'll need in the partitioner to distribute key-value pairs in a balanced manner. Of course, each mapper will only see some of the data.
  • Each mapper can find out its task ID and use that ID to build a unique filename in a specified hdfs folder to hold the gathered statistics. Write this file out in the cleanup() method which runs at the end of the task.
  • use lazy initialization in the partitioner to read all files in the specified hdfs directory. This gets you all of the statistics gathered during the mapper phase. From there you're left with implementing whatever partitioning logic you need to correctly partition the data.

This all assumes that the partitioner isn't called until all mappers have finished, but that's the best I've been able to do so far.

like image 62
Chris Gerken Avatar answered Sep 28 '22 02:09

Chris Gerken


In best of my understanding - there is no single place in MR processing where all keys are present. More then this - there is no guarantee that single machine can store this data. I think this problem does not have ideal solution in current MR framework. I think so because to have ideal solution - we have to wait for the end of last mapper and only then analyze key distribution and parametrize partitioner with this knowledge.
This approach will significantly complicate the system and raise latency.
I think good approximation might be to do random sampling over data to get the idea of the keys distribution and then make partiotioner to work according to it.
As far as I understand Terasort implementation is doing something very similar : http://sortbenchmark.org/YahooHadoop.pdf

like image 20
David Gruzman Avatar answered Sep 28 '22 02:09

David Gruzman