Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterating through a Spark RDD

Starting with a Spark DataFrame to create a vector matrix for further analytics processing.

feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache()
feature_matrix_vectors.first()

The output is an array of vectors. Some of those vector have an null in them

>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0])
...
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null])

From this i want to iterate through the vector matrix and create an LabeledPoint array with 0 (zero) if the vector contains a null, otherwise with a 1.

def f(row):
    if row.contain(None):
       LabeledPoint(1.0,row)
    else:
       LabeledPoint(0.0,row)

I have tried to iterate through the vector matrix using

feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) #   create a generator of row sums
next(feature_matrix_labeledPoint) # Run the iteration protocol

but this doesn't work.

TypeError: 'PipelinedRDD' object is not iterable

Any help would be great

like image 725
Eoin Lane Avatar asked Jun 29 '15 12:06

Eoin Lane


People also ask

Can we iterate over RDD?

Using map() to Loop Through Rows in DataFrame PySpark map() Transformation is used to loop/iterate through the PySpark DataFrame/RDD by applying the transformation function (lambda) on every element (Rows and Columns) of RDD/DataFrame.

How do I iterate through Spark DataFrame?

For looping through each row using map() first we have to convert the PySpark dataframe into RDD because map() is performed on RDD's only, so first convert into RDD it then use map() in which, lambda function for iterating through each row and stores the new RDD in some variable then convert back that new RDD into ...

How do I iterate over a PySpark column?

iterrows() This method is used to iterate the columns in the given PySpark DataFrame. It can be used with for loop and takes column names through the row iterator and index to iterate columns. Finally, it will display the rows according to the specified indices.

Is foreach an action in Spark?

foreach() operation is an action. It does not return any value. It executes input function on each element of an RDD.


1 Answers

RDDs are not a drop in replacement for a Python lists. You have to use either actions or transformations which are available on a given RDD. Here you can simply use map:

from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint


feature_matrix_vectors = sc.parallelize([
    DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]),
    DenseVector([1.0, 1231.0, 15.0, 2008.0, None])
])

(feature_matrix_vectors
    .map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v))
    .collect())
like image 137
zero323 Avatar answered Oct 22 '22 13:10

zero323