I have pyspark.rdd.PipelinedRDD
(Rdd1)
.
when I am doing Rdd1.collect()
,it is giving result like below.
[(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
(1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
(2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
(3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]
Now I want to convert pyspark.rdd.PipelinedRDD to Data frame with out using collect() method
My final data frame should be like below. df.show()
should be like:
+----------+-------+-------------------+
|CId |IID |Score |
+----------+-------+-------------------+
|10 |4 |2.9996439803387602 |
|10 |5 |1.6767412921625855 |
|10 |3 |3.616726727464709 |
|1 |4 |-1.5271512313750577|
|1 |5 |1.9665475696370045 |
|1 |3 |2.016527311459324 |
|2 |4 |4.033642544526678 |
|2 |5 |3.1517805604906313 |
|2 |3 |6.230272144805092 |
|3 |4 |2.9757316477407443 |
|3 |5 |-1.5689126834176417|
|3 |3 |-0.3924680103722977|
+----------+-------+-------------------+
I can achieve this converting to rdd next applying collect, iteration and finally Data frame.
but now I want to convert pyspark.rdd.PipelinedRDD
to Dataframe with out using any collect()
method.
please let me know how to achieve this?
Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.
Method 1: Using createDataframe() function. After creating the RDD we have converted it to Dataframe using createDataframe() function in which we have passed the RDD and defined schema for Dataframe.
collect () vs select () select() is a transformation that returns a new DataFrame and holds the columns that are selected whereas collect() is an action that returns the entire data set in an Array to the driver.
Convert Using createDataFrame Method The SparkSession object has a utility method for creating a DataFrame – createDataFrame. This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema.
You want to do two things here: 1. flatten your data 2. put it into a dataframe
One way to do it is as follows:
First, let us flatten the dictionary:
rdd2 = Rdd1.flatMapValues(lambda x : [ (k, x[k]) for k in x.keys()])
When collecting the data, you get something like this:
[(10, (3, 3.616726727464709)), (10, (4, 2.9996439803387602)), ...
Then we can format the data and turn it into a dataframe:
rdd2.map(lambda x : (x[0], x[1][0], x[1][1]))\
.toDF(("CId", "IID", "Score"))\
.show()
which gives you this:
+---+---+-------------------+
|CId|IID| Score|
+---+---+-------------------+
| 10| 3| 3.616726727464709|
| 10| 4| 2.9996439803387602|
| 10| 5| 1.6767412921625855|
| 1| 3| 2.016527311459324|
| 1| 4|-1.5271512313750577|
| 1| 5| 1.9665475696370045|
| 2| 3| 6.230272144805092|
| 2| 4| 4.033642544526678|
| 2| 5| 3.1517805604906313|
| 3| 3|-0.3924680103722977|
| 3| 4| 2.9757316477407443|
| 3| 5|-1.5689126834176417|
+---+---+-------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With