I am using Hadoop to analyze a very uneven distribution of data. Some keys have thousands of values, but most have only one. For example, network traffic associated with IP addresses would have many packets associated with a few talkative IPs and just a few with most IPs. Another way of saying this is that the Gini index is very high.
To process this efficiently, each reducer should either get a few high-volume keys or a lot of low-volume keys, in such a way as to get a roughly even load. I know how I would do this if I were writing the partition process: I would take the sorted list of keys
(including all duplicate keys) that was produced by the mappers as well as the number of reducers N
and put splits at
split[i] = keys[floor(i*len(keys)/N)]
Reducer i
would get keys k
such that split[i] <= k < split[i+1]
for 0 <= i < N-1
and split[i] <= k
for i == N-1
.
I'm willing to write my own partitioner in Java, but the Partitioner<KEY,VALUE> class only seems to have access to one key-value record at a time, not the whole list. I know that Hadoop sorts the records that were produced by the mappers, so this list must exist somewhere. It might be distributed among several partitioner nodes, in which case I would do the splitting procedure on one of the sublists and somehow communicate the result to all other partitioner nodes. (Assuming that the chosen partitioner node sees a randomized subset, the result would still be approximately load-balanced.) Does anyone know where the sorted list of keys is stored, and how to access it?
I don't want to write two map-reduce jobs, one to find the splits and another to actually use them, because that seems wasteful. (The mappers would have to do the same job twice.) This seems like a general problem: uneven distributions are pretty common.
I've been thinking about this problem, too. This is the high-level approach I would take if someone forced me.
This all assumes that the partitioner isn't called until all mappers have finished, but that's the best I've been able to do so far.
In best of my understanding - there is no single place in MR processing where all keys are present. More then this - there is no guarantee that single machine can store this data.
I think this problem does not have ideal solution in current MR framework. I think so because to have ideal solution - we have to wait for the end of last mapper and only then analyze key distribution and parametrize partitioner with this knowledge.
This approach will significantly complicate the system and raise latency.
I think good approximation might be to do random sampling over data to get the idea of the keys distribution and then make partiotioner to work according to it.
As far as I understand Terasort implementation is doing something very similar : http://sortbenchmark.org/YahooHadoop.pdf
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With