I am learning Python and Hadoop. I completed the setup and basic examples provided in official site using pythong+hadoop streaming. I considered implementing join of 2 files. I completed equi-join which checks if same key appears in both input files, then it outputs the key along with values from file 1 and file 2 in that order. The equality join is working as it is supposed.
Now, I wish to do inequality join which involves finding Cross Product before applying the inequality condition. I am using the same mapper (do I need to change it) and I changed the reducer so that it contains a nested loop (since every key-value pair in file1 must be matched with all key-values pairs in file2). This doesn't work since you can only go through the stream once. Now, I thought of an option of storing 'some' values in reducer and comparing them but I have no idea 'how' many. Naive method is to store whole file2 content in a array (or similar structure) but thats stupid and goes against the idea of distributed processing. Finally, my questions are
How can I store values in reducer so that I can have cross product between two files?
In equi-join, Hadoop seems to be sending all key value pairs with same key to same reducer which is perfectly fine and works well for that case. However, how I do change this behaviour (if needed) so that required grouping of key-value pairs go correct reducer?
Sample Files: http://pastebin.com/ufYydiPu
Python Map/Reduce Scripts: http://pastebin.com/kEJwd2u1
Hadoop Command I am using:
bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /home/hduser/mapper.py -mapper mapper.py -file /home/hduser/ireducer.py -reducer reducer.py -input /user/hduser/inputfiles/* -output /user/hduser/join-output
Any help/hint is much appreciated.
MapReduce with Python is a programming model. It allows big volumes of data to be processed and created by dividing work into independent tasks. It further enables performing the tasks in parallel across a cluster of machines.
Hadoop Streaming is a feature that comes with Hadoop and allows users or developers to use various different languages for writing MapReduce programs like Python, C++, Ruby, etc. It supports all the languages that can read from standard input and write to standard output.
One way to deal with the multiple combinations which can be very helpful to avoid the nested loops is to use the itertools module. Specifically the itertools.product function which takes care of the cartesian product using generators. This is good for memory usage, efficiency and it can simplify your code significantly if you have to join multiple data sets in one map reduce job.
Regarding the correspondence between the data yielded by the mapper and the data sets to be combined in the reducer, if the data sets for each key are not too big, you can simply yield from the mapper a combination like:
{key, [origin_1, values]}
{key, [origin_2, values]}
Thus, you will be able to group the values with same origin in the reducer into dictionaries which will be the data sets over which the cartesian product will be applied using itertools.product.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With