Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark 'DataFrame' object has no attribute '_get_object_id'

I am trying to run some code, but getting error:

'DataFrame' object has no attribute '_get_object_id'

The code:

items = [(1,12),(1,float('Nan')),(1,14),(1,10),(2,22),(2,20),(2,float('Nan')),(3,300),
         (3,float('Nan'))]

sc = spark.sparkContext
rdd = sc.parallelize(items)
df = rdd.toDF(["id", "col1"])

import pyspark.sql.functions as func
means = df.groupby("id").agg(func.mean("col1"))

# The error is thrown at this line
df = df.withColumn("col1", func.when((df["col1"].isNull()), means.where(func.col("id")==df["id"])).otherwise(func.col("col1"))) 
like image 700
Alon Avatar asked Aug 05 '19 17:08

Alon


People also ask

How do I know if a DataFrame is empty PySpark?

Method 1: isEmpty() The isEmpty function of the DataFrame or Dataset returns true when the DataFrame is empty and false when it's not empty. If the dataframe is empty, invoking “isEmpty” might result in NullPointerException. Note : calling df. head() and df.

How do I use ISIN in PySpark?

PySpark isin() or IN operator is used to check/filter if the DataFrame values are exists/contains in the list of values. isin() is a function of Column class which returns a boolean value True if the value of the expression is contained by the evaluated values of the arguments.

How do you convert PySpark DF to pandas DF?

Convert PySpark Dataframe to Pandas DataFramePySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.


1 Answers

You can't reference a second spark DataFrame inside a function, unless you're using a join. IIUC, you can do the following to achieve your desired result.

Suppose that means is the following:

#means.show()
#+---+---------+
#| id|avg(col1)|
#+---+---------+
#|  1|     12.0|
#|  3|    300.0|
#|  2|     21.0|
#+---+---------+

Join df and means on the id column, then apply your when condition

from pyspark.sql.functions import when

df.join(means, on="id")\
    .withColumn(
        "col1",
        when(
            (df["col1"].isNull()), 
            means["avg(col1)"]
        ).otherwise(df["col1"])
    )\
    .select(*df.columns)\
    .show()
#+---+-----+
#| id| col1|
#+---+-----+
#|  1| 12.0|
#|  1| 12.0|
#|  1| 14.0|
#|  1| 10.0|
#|  3|300.0|
#|  3|300.0|
#|  2| 21.0|
#|  2| 22.0|
#|  2| 20.0|
#+---+-----+

But in this case, I'd actually recommend using a Window with pyspark.sql.functions.mean:

from pyspark.sql import Window
from pyspark.sql.functions import col, mean

df.withColumn(
    "col1",
    when(
        col("col1").isNull(), 
        mean("col1").over(Window.partitionBy("id"))
    ).otherwise(col("col1"))
).show()
#+---+-----+
#| id| col1|
#+---+-----+
#|  1| 12.0|
#|  1| 10.0|
#|  1| 12.0|
#|  1| 14.0|
#|  3|300.0|
#|  3|300.0|
#|  2| 22.0|
#|  2| 20.0|
#|  2| 21.0|
#+---+-----+
like image 101
pault Avatar answered Oct 22 '22 12:10

pault