Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

need instance of RDD but returned class 'pyspark.rdd.PipelinedRDD'

Hi I have this code in Notebooks and traying to code python spark:

 mydataNoSQL.createOrReplaceTempView("mytable")
 spark.sql("SELECT * from mytable")
 return mydataNoSQL

def getsameData(df,spark):
result = spark.sql("select * from mytable where temeperature is not null")
return result.rdd.sample(False, 0.1).map(lambda row : (row.temperature))

I need an instance RDD but I am geting an class 'pyspark.rdd.PipelinedRDD'

Any help will be wellcome.

like image 200
Oscar C. Avatar asked Jun 04 '17 14:06

Oscar C.


1 Answers

pyspark.rdd.PipelinedRDD is a subclass of RDD and it must have all the API's defined in the RDD. ie. PipelinedRDD is just a special type of RDD which is created when you run a map function on an RDD.

for example, take a look at the below snippet.

>>> rdd = spark.sparkContext.parallelize(range(1,10))
>>> type(rdd)
<class 'pyspark.rdd.RDD'> ## the type is RDD here
>>> rdd = rdd.map(lambda x: x * x)
>>> type(rdd)
<class 'pyspark.rdd.PipelinedRDD'> ## after the map operation the type is changed to pyspark.rdd.PipelinedRDD

so you should just treat your pyspark.rdd.PipelinedRDD just as an RDD in your code.

There is no complete casting support in Python as it is a dynamically typed language. to forcefully convert your pyspark.rdd.PipelinedRDD to a normal RDD you can collect on rdd and parallelize it back

>>> rdd = spark.sparkContext.parallelize(rdd.collect())
>>> type(rdd)
<class 'pyspark.rdd.RDD'>

Running collect on an RDD may cause MemoryError if the RDD's data is large.

like image 145
rogue-one Avatar answered Oct 18 '22 14:10

rogue-one