I am trying to run a subquery inside a case statement in Pyspark and it is throwing an exception. I am trying to create a new flag if id in one table is present in a different table.
Can anyone please let me know if this is even possible in pyspark?
temp_df=spark.sql("select *, case when key in (select distinct key from Ids) then 1 else 0 end as flag from main_table")
Here is the error:
AnalysisException: 'Predicate sub-queries can only be used in a Filter
As of Spark 2.0, Spark SQL supports subqueries. A subquery (aka subquery expression) is a query that is nested inside of another query. There are the following kinds of subqueries: A subquery as a source (inside a SQL FROM clause)
PySpark isin() or IN operator is used to check/filter if the DataFrame values are exists/contains in the list of values. isin() is a function of Column class which returns a boolean value True if the value of the expression is contained by the evaluated values of the arguments.
The left anti join in PySpark is similar to the join functionality, but it returns only columns from the left DataFrame for non-matched records.
This appears to be the latest detailed documentation regarding subqueries - it relates to Spark 2.0, but I haven't seen a major update in this area since then.
The linked notebook in that reference makes it clear that indeed predicate subqueries are currently supported only within WHERE clauses. i.e. this would work (but of course would not yield the desired result):
spark.sql("select * from main_table where id in (select distinct id from ids_table)")
You could get the same result by using a left JOIN - that's what IN subqueries are generally translated into (for more details on that refer to the aforementioned linked notebook).
For example:
# set up some data
l1 = [('Alice', 1), ('Bob', 2), ('Eve', 3)]
df1 = sql_sc.createDataFrame(l1, ['name', 'id'])
l2 = [(1,), (2,)]
df2 = sql_sc.createDataFrame(l2, ['id'])
df1.createOrReplaceTempView("main_table")
df2.createOrReplaceTempView("ids_table")
# use a left join
spark.sql("select * from main_table m left join ids_table d on (m.id=d.id)") \
.withColumn('flag', func.when(func.col('d.id').isNull(), 0).otherwise(1)) \
.drop('id').collect()
# result:
[Row(name='Bob', flag=1), Row(name='Eve', flag=0), Row(name='Alice', flag=1)]
Or, using pyspark sql functions rather than sql syntax:
df2 = df2.withColumnRenamed('id', 'id_faux')
df1.join(df2, df1.id == df2.id_faux, how='left') \
.withColumn('flag', func.when(func.col('id_faux').isNull(), 0).otherwise(1)).drop('id_faux').collect()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With