I have a mapper method:
def mapper(value):
...
for key, value in some_list:
yield key, value
what I need is not really far from the ordinary wordcount example, actually. I already have working script, but only if the mapper method looks like that:
def mapper(value):
...
return key, value
This is how its call looks like:
sc.textFile(sys.argv[2], 1).map(mapper).reduceByKey(reducer).collect()
I spent 2 hours trying to write code that would support generators in mapper. But couldn't do that. I even agree to just returning a list:
def mapper(value):
...
result_list = []
for key, value in some_list:
result_list.append( key, value )
return result_list
Here: https://groups.google.com/forum/#!searchin/spark-users/flatmap$20multiple/spark-users/1WqVhRBaJsU/-D5QRbenlUgJ I found that I should use flatMap, but it didn't do the trick - my reducer then started to get inputs like (key1, value1, key2, value2, value3, ...) - but it should be [(key1, value1), (key2, value2, value3)...]. In other words, reducer started taking only single pieces, and don't know whether it's a value or a key, and if value - to which key it belongs.
So how to use mappers that return iterators or lists?
Thanks!
You can use flatMap
if you want a map function that returns multiple outputs.
The function passed to flatMap
can return an iterable:
>>> words = sc.textFile("README.md")
>>> def mapper(line):
... return ((word, 1) for word in line.split())
...
>>> words.flatMap(mapper).take(4)
[(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Lightning-Fast', 1)]
>>> counts = words.flatMap(mapper).reduceByKey(lambda x, y: x + y)
>>> counts.take(5)
[(u'all', 1), (u'help', 1), (u'webpage', 1), (u'when', 1), (u'Hadoop', 12)]
It can also be a generator function:
>>> words = sc.textFile("README.md")
>>> def mapper(line):
... for word in line.split():
... yield (word, 1)
...
>>> words.flatMap(mapper).take(4)
[(u'#', 1), (u'Apache', 1), (u'Spark', 1), (u'Lightning-Fast', 1)]
>>> counts = words.flatMap(mapper).reduceByKey(lambda x, y: x + y)
>>> counts.take(5)
[(u'all', 1), (u'help', 1), (u'webpage', 1), (u'when', 1), (u'Hadoop', 12)]
You mentioned that you tried flatMap
but it flattened everything down to a list [key, value, key, value, ...]
instead of a list [(key, value), (key, value)...]
of key-value pairs. I suspect that this is a problem in your map function. If you're still experiencing this problem, could you post a more complete version of your map function?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With