I'm having some trouble with a Pyspark Dataframe. Specifically, I'm trying to create a column for a dataframe, which is a result of coalescing two columns of the dataframe.
E.g.
this_dataframe = this_dataframe.withColumn('new_max_price', coalesce(this_dataframe['max_price'],this_dataframe['avg(max_price)']).cast(FloatType()))
The problem with this code is that it still returns values of "null" in certain rows. Specifically I'm running this code:
this_dataset.where(col("new_max_price").isNull()).count()
This code gives positive results. So, while this code works, it does not produce intended results.
I found some other questions (such as Selecting values from non-null columns in a PySpark DataFrame) that were asked that were similar, but for some reason I'm unable to replicate their results.
Here's some code that I have that is based on the aforementioned link:
def coalesce_columns(c1, c2):
if c1 != None and c2 != None:
return c1
elif c1 == None:
return c2
else:
return c1
coalesceUDF = udf(coalesce_columns)
max_price_col = [coalesceUDF(col("max_price"), col("avg(max_price)")).alias("competitive_max_price")]
this_dataset.select(max_price_col).show()
When I try to execute the last line to test that my results are correct I receive an error.
AttributeError: 'unicode' object has no attribute 'isNull'
So basically the question is, how can I use a spark sql function to create a column that is the result of coalescing two pyspark dataframe columns? If this is impossible, what kind of UDF can I use in order to create some dataframe column that I can append to another dataframe?
I think that coalesce
is actually doing its work and the root of the problem is that you have null
values in both columns resulting in a null
after coalescing. I give you an example that may help you.
from pyspark.sql.types import FloatType
from pyspark.sql.functions import *
data = [Row(a="3.07",b="3.05"),
Row(a="3.06",b="3.06"),
Row(a="3.09",b=None),
Row(a=None,b=None),
Row(a=None,b="3.06"),
Row(a=None,b=None)
]
df = sqlContext.createDataFrame(data)
tmp = df.withColumn('c', coalesce(df['a'],df['b']).cast(FloatType()))
tmp.where(col("c").isNotNull()).show()
+----+----+----+
| a| b| c|
+----+----+----+
|3.07|3.05|3.07|
|3.06|3.06|3.06|
|3.09|null|3.09|
|null|3.06|3.06|
+----+----+----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With