Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark RDD: find index of an element

Tags:

python

pyspark

I am new to pyspark and I am trying to convert a list in python to rdd and then I need to find elements index using the rdd. For the first part I am doing:

list = [[1,2],[1,4]]
rdd = sc.parallelize(list).cache()

So now the rdd is actually my list. The thing is that I want to find index of any arbitrary element something like "index" function which works for python lists. I am aware of a function called zipWithIndex which assign index to each element but I could not find proper example in python (there are examples with java and scala).

Thanks.

like image 574
ahajib Avatar asked Apr 05 '16 22:04

ahajib


People also ask

How do you find RDD with element indices?

First, when you construct new IndexedFetcher(rdd, itemClass) , it counts the number of elements in each partition of the RDD. Then, when you call indexedFetcher. get(n) , it runs a job on only the partition that contains that index. Note that I needed to compile this using Java 1.7 instead of 1.8; as of Spark 1.1.

How do you use Zipwithindex in Pyspark?

Zips this RDD with its element indices. The ordering is first based on the partition index and then the ordering of items within each partition. So the first item in the first partition gets index 0, and the last item in the last partition receives the largest index.

What does RDD collect do?

Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.


1 Answers

Use filter and zipWithIndex:

rdd.zipWithIndex().
filter(lambda (key,index) : key == [1,2]).
map(lambda (key,index) : index).collect()

Note that [1,2] here can be easily changed to a variable name and this whole expression can be wrapped within a function.

How It Works

zipWithIndex simply returns a tuple of (item,index) like so:

rdd.zipWithIndex().collect()
> [([1, 2], 0), ([1, 4], 1)]

filter finds only those that match a particular criterion (in this case, that key equals a specific sublist):

rdd.zipWithIndex().filter(lambda (key,index) : key == [1,2]).collect()
> [([1, 2], 0)]

map is fairly obvious, we can just get back the index:

rdd.zipWithIndex().filter(lambda (key,index) : key == [1,2]).
map(lambda (key,index): index).collect()
> [0]

and then we can simply get the first element by indexing [0] if you want.

like image 121
Akshat Mahajan Avatar answered Nov 14 '22 21:11

Akshat Mahajan