Starting with a Spark DataFrame to create a vector matrix for further analytics processing.
feature_matrix_vectors = feature_matrix1.map(lambda x: Vectors.dense(x)).cache()
feature_matrix_vectors.first()
The output is an array of vectors. Some of those vector have an null in them
>>> DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0])
...
>>> DenseVector([1.0, 1231.0, 15.0, 2008.0, null])
From this i want to iterate through the vector matrix and create an LabeledPoint array with 0 (zero) if the vector contains a null, otherwise with a 1.
def f(row):
if row.contain(None):
LabeledPoint(1.0,row)
else:
LabeledPoint(0.0,row)
I have tried to iterate through the vector matrix using
feature_matrix_labeledPoint = (f(row) for row in feature_matrix_vectors) # create a generator of row sums
next(feature_matrix_labeledPoint) # Run the iteration protocol
but this doesn't work.
TypeError: 'PipelinedRDD' object is not iterable
Any help would be great
Using map() to Loop Through Rows in DataFrame PySpark map() Transformation is used to loop/iterate through the PySpark DataFrame/RDD by applying the transformation function (lambda) on every element (Rows and Columns) of RDD/DataFrame.
For looping through each row using map() first we have to convert the PySpark dataframe into RDD because map() is performed on RDD's only, so first convert into RDD it then use map() in which, lambda function for iterating through each row and stores the new RDD in some variable then convert back that new RDD into ...
iterrows() This method is used to iterate the columns in the given PySpark DataFrame. It can be used with for loop and takes column names through the row iterator and index to iterate columns. Finally, it will display the rows according to the specified indices.
foreach() operation is an action. It does not return any value. It executes input function on each element of an RDD.
RDDs
are not a drop in replacement for a Python lists. You have to use either actions or transformations which are available on a given RDD
. Here you can simply use map
:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint
feature_matrix_vectors = sc.parallelize([
DenseVector([1.0, 31.0, 5.0, 1935.0, 24.0]),
DenseVector([1.0, 1231.0, 15.0, 2008.0, None])
])
(feature_matrix_vectors
.map(lambda v: LabeledPoint(1.0 if None in v else 0.0, v))
.collect())
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With