Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Broadcast a dictionary to rdd in PySpark

Tags:

I am just getting the hang of Spark, and I have function that needs to be mapped to an rdd, but uses a global dictionary:

from pyspark import SparkContext  sc = SparkContext('local[*]', 'pyspark')  my_dict = {"a": 1, "b": 2, "c": 3, "d": 4} # at no point will be modified my_list = ["a", "d", "c", "b"]  def my_func(letter):     return my_dict[letter]  my_list_rdd = sc.parallelize(my_list)  result = my_list_rdd.map(lambda x: my_func(x)).collect()  print result 

The above gives the expected result; however, I am really not sure about my use of the global variable my_dict. It seems that a copy of the dictionary is made with every partition. And it just does not feel right..

It looked like broadcast is what I am looking for. However, when I try to use it:

my_dict_bc = sc.broadcast(my_dict)  def my_func(letter):     return my_dict_bc[letter]  

I get the following error:

TypeError: 'Broadcast' object has no attribute '__getitem__ 

This seems to imply that I cannot broadcast a dictionary.

My question: If I have a function that uses a global dictionary, that needs to be mapped to rdd, what is the proper way to do it?

My example is very simple, but in reality my_dict and my_list are much larger, and my_func is more complicated.

like image 389
Akavall Avatar asked Jan 13 '16 15:01

Akavall


People also ask

Can we broadcast an RDD in spark?

Spark RDD Broadcast variable example Below is a very simple example of how to use broadcast variables on RDD. This example defines commonly used data (country and states) in a Map variable and distributes the variable using SparkContext. broadcast() and then use these variables on RDD map() transformation.

Can RDD be broadcasted?

If the small RDD is small enough to fit into the memory of each worker we can turn it into a broadcast variable and turn the entire operation into a so called map side join for the larger RDD [23].


1 Answers

You forgot something important about Broadcast objects, they have a property called value where the data is stored.

Therefore you need to modify my_func to something like this:

my_dict_bc = sc.broadcast(my_dict)  def my_func(letter):     return my_dict_bc.value[letter]  
like image 147
Alberto Bonsanto Avatar answered Sep 29 '22 12:09

Alberto Bonsanto