Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Extracting a dictionary from an RDD in Pyspark

This is a homework question:

I have an RDD which is a collection os tuples. I also have function which returns a dictionary from each input tuple. Somehow, the opposite of reduce function.

With map, I can easily go from a RDD of tuples to a RDD of dictionaries. But, since a dictionary is a collection of (key, value) pairs, I would like to convert the RDD of dictionaries into an RDD of (key, value) tuples with each dictionary contents.

That way, if my RDD contains 10 tuples, then I get an RDD containing 10 dictionaries with 5 elements (for example), and finally I get an RDD of 50 tuples.

I assume this has to be possible but, how? (Maybe the problem is that I don't know how this operation is called in English)

like image 711
Roman Rdgz Avatar asked Jun 23 '15 15:06

Roman Rdgz


People also ask

How do you convert RDD to string in PySpark?

Try x = all_coord_iso_rdd. take(4) . Then print(type(x)) - you'll see that it's a list (of tuples). Then just convert it to string.

Which library helps Python work with RDD?

Thanks to a library called Py4J, Python can interface with JVM objects, in our case RDD's, and this library one of the tools that makes PySpark work. To start off, we'll load the dataset containing all of the Daily Show guests into an RDD. We are using the TSV version of FiveThirtyEight's dataset.

Does PySpark use RDD?

Resilient Distributed Dataset or RDD in a PySpark is a core data structure of PySpark. PySpark RDD's is a low-level object and are highly efficient in performing distributed tasks.


2 Answers

My 2 cents:

There is a PairRDD function named "collectAsMap" that returns a dictionary from a RDD.

Let me show you an example:

sample = someRDD.sample(0, 0.0001, 0)
sample_dict = sample.collectAsMap()
print sample.collect()
print sample_dict

[('hi', 4123.0)]
{'hi': 4123.0}

Documentation here

Hope it helps! Regards!

like image 70
Leandro Mora Avatar answered Oct 16 '22 19:10

Leandro Mora


I guess what you want is just a flatMap:

dicts = sc.parallelize([{"foo": 1, "bar": 2}, {"foo": 3, "baz": -1, "bar": 5}])
dicts.flatMap(lambda x: x.items())

flatMap takes a function from a element of RDD to iterable and then concatenates the results. Another name for the same type of operation outside the Spark context is mapcat:

>>> from toolz.curried import map, mapcat, concat, pipe
>>> from itertools import repeat
>>> pipe(range(4), mapcat(lambda i: repeat(i, i + 1)), list)
[0, 1, 1, 2, 2, 2, 3, 3, 3, 3]

or going step by step:

>>> pipe(range(4), map(lambda i: repeat(i, i + 1)), concat, list)
[0, 1, 1, 2, 2, 2, 3, 3, 3, 3]

The same thing using itertools.chain

>>> from itertools import chain
>>> pipe((repeat(i, i + 1) for i in  range(4)), chain.from_iterable, list)
>>> [0, 1, 1, 2, 2, 2, 3, 3, 3, 3]
like image 37
zero323 Avatar answered Oct 16 '22 21:10

zero323