Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

List (or iterator) of tuples returned by MAP (PySpark)

I have a mapper method:

def mapper(value):
    ...
    for key, value in some_list:
        yield key, value

what I need is not really far from the ordinary wordcount example, actually. I already have working script, but only if the mapper method looks like that:

def mapper(value):
    ...
    return key, value

This is how its call looks like:

sc.textFile(sys.argv[2], 1).map(mapper).reduceByKey(reducer).collect()

I spent 2 hours trying to write code that would support generators in mapper. But couldn't do that. I even agree to just returning a list:

def mapper(value):
    ...
    result_list = []
    for key, value in some_list:
        result_list.append( key, value )
    return result_list

Here: https://groups.google.com/forum/#!searchin/spark-users/flatmap$20multiple/spark-users/1WqVhRBaJsU/-D5QRbenlUgJ I found that I should use flatMap, but it didn't do the trick - my reducer then started to get inputs like (key1, value1, key2, value2, value3, ...) - but it should be [(key1, value1), (key2, value2, value3)...]. In other words, reducer started taking only single pieces, and don't know whether it's a value or a key, and if value - to which key it belongs.

So how to use mappers that return iterators or lists?

Thanks!

like image 962
Spaceman Avatar asked Jan 13 '14 16:01

Spaceman


1 Answers

You can use flatMap if you want a map function that returns multiple outputs.

The function passed to flatMap can return an iterable:

>>> words = sc.textFile("README.md")
>>> def mapper(line):
...     return ((word, 1) for word in line.split())
...
>>> words.flatMap(mapper).take(4)
[(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Lightning-Fast', 1)]
>>> counts = words.flatMap(mapper).reduceByKey(lambda x, y: x + y)
>>> counts.take(5)
[(u'all', 1), (u'help', 1), (u'webpage', 1), (u'when', 1), (u'Hadoop', 12)]

It can also be a generator function:

>>> words = sc.textFile("README.md")
>>> def mapper(line):
...     for word in line.split():
...         yield (word, 1)
...
>>> words.flatMap(mapper).take(4)
[(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Lightning-Fast', 1)]
>>> counts = words.flatMap(mapper).reduceByKey(lambda x, y: x + y)
>>> counts.take(5)
[(u'all', 1), (u'help', 1), (u'webpage', 1), (u'when', 1), (u'Hadoop', 12)]

You mentioned that you tried flatMap but it flattened everything down to a list [key, value, key, value, ...] instead of a list [(key, value), (key, value)...]of key-value pairs. I suspect that this is a problem in your map function. If you're still experiencing this problem, could you post a more complete version of your map function?

like image 88
Josh Rosen Avatar answered Sep 21 '22 21:09

Josh Rosen