Hi I have this code in Notebooks and traying to code python spark:
mydataNoSQL.createOrReplaceTempView("mytable")
spark.sql("SELECT * from mytable")
return mydataNoSQL
def getsameData(df,spark):
result = spark.sql("select * from mytable where temeperature is not null")
return result.rdd.sample(False, 0.1).map(lambda row : (row.temperature))
I need an instance RDD but I am geting an class 'pyspark.rdd.PipelinedRDD'
Any help will be wellcome.
pyspark.rdd.PipelinedRDD
is a subclass of RDD
and it must have all the API's defined in the RDD. ie. PipelinedRDD is just a special type of RDD
which is created when you run a map function on an RDD
.
for example, take a look at the below snippet.
>>> rdd = spark.sparkContext.parallelize(range(1,10))
>>> type(rdd)
<class 'pyspark.rdd.RDD'> ## the type is RDD here
>>> rdd = rdd.map(lambda x: x * x)
>>> type(rdd)
<class 'pyspark.rdd.PipelinedRDD'> ## after the map operation the type is changed to pyspark.rdd.PipelinedRDD
so you should just treat your pyspark.rdd.PipelinedRDD
just as an RDD
in your code.
There is no complete casting support in Python as it is a dynamically typed language. to forcefully convert your pyspark.rdd.PipelinedRDD
to a normal RDD you can collect on rdd and parallelize it back
>>> rdd = spark.sparkContext.parallelize(rdd.collect())
>>> type(rdd)
<class 'pyspark.rdd.RDD'>
Running collect
on an RDD may cause MemoryError
if the RDD's data is large.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With