Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating Pyspark DataFrame column that coalesces two other Columns, why am I getting error of 'unicode' object has no attribute isNull?

I'm having some trouble with a Pyspark Dataframe. Specifically, I'm trying to create a column for a dataframe, which is a result of coalescing two columns of the dataframe.

E.g.

this_dataframe = this_dataframe.withColumn('new_max_price', coalesce(this_dataframe['max_price'],this_dataframe['avg(max_price)']).cast(FloatType()))

The problem with this code is that it still returns values of "null" in certain rows. Specifically I'm running this code:

this_dataset.where(col("new_max_price").isNull()).count()

This code gives positive results. So, while this code works, it does not produce intended results.

I found some other questions (such as Selecting values from non-null columns in a PySpark DataFrame) that were asked that were similar, but for some reason I'm unable to replicate their results.

Here's some code that I have that is based on the aforementioned link:

def coalesce_columns(c1, c2):
    if c1 != None and c2 != None:
        return c1
    elif c1 == None:
        return c2
    else:
        return c1

coalesceUDF = udf(coalesce_columns)
max_price_col = [coalesceUDF(col("max_price"), col("avg(max_price)")).alias("competitive_max_price")]
this_dataset.select(max_price_col).show()

When I try to execute the last line to test that my results are correct I receive an error.

AttributeError: 'unicode' object has no attribute 'isNull'

So basically the question is, how can I use a spark sql function to create a column that is the result of coalescing two pyspark dataframe columns? If this is impossible, what kind of UDF can I use in order to create some dataframe column that I can append to another dataframe?

like image 790
BMac Avatar asked Nov 01 '16 21:11

BMac


1 Answers

I think that coalesce is actually doing its work and the root of the problem is that you have null values in both columns resulting in a null after coalescing. I give you an example that may help you.

from pyspark.sql.types import FloatType
from pyspark.sql.functions import *

data = [Row(a="3.07",b="3.05"),
        Row(a="3.06",b="3.06"),
        Row(a="3.09",b=None),
        Row(a=None,b=None),
        Row(a=None,b="3.06"),
        Row(a=None,b=None)
       ]

df = sqlContext.createDataFrame(data)

tmp = df.withColumn('c', coalesce(df['a'],df['b']).cast(FloatType()))

tmp.where(col("c").isNotNull()).show()


+----+----+----+
|   a|   b|   c|
+----+----+----+
|3.07|3.05|3.07|
|3.06|3.06|3.06|
|3.09|null|3.09|
|null|3.06|3.06|
+----+----+----+
like image 126
Luis Avatar answered Nov 20 '22 17:11

Luis