Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark- Subquery in a case statement

I am trying to run a subquery inside a case statement in Pyspark and it is throwing an exception. I am trying to create a new flag if id in one table is present in a different table.

Can anyone please let me know if this is even possible in pyspark?

temp_df=spark.sql("select *, case when key in (select distinct key from Ids) then 1 else 0 end as flag from main_table")

Here is the error:

AnalysisException: 'Predicate sub-queries can only be used in a Filter
like image 703
kkumar Avatar asked Mar 15 '18 00:03

kkumar


People also ask

Does Sparksql support subquery?

As of Spark 2.0, Spark SQL supports subqueries. A subquery (aka subquery expression) is a query that is nested inside of another query. There are the following kinds of subqueries: A subquery as a source (inside a SQL FROM clause)

How do I use ISIN in PySpark?

PySpark isin() or IN operator is used to check/filter if the DataFrame values are exists/contains in the list of values. isin() is a function of Column class which returns a boolean value True if the value of the expression is contained by the evaluated values of the arguments.

What is left anti join PySpark?

The left anti join in PySpark is similar to the join functionality, but it returns only columns from the left DataFrame for non-matched records.


1 Answers

This appears to be the latest detailed documentation regarding subqueries - it relates to Spark 2.0, but I haven't seen a major update in this area since then.

The linked notebook in that reference makes it clear that indeed predicate subqueries are currently supported only within WHERE clauses. i.e. this would work (but of course would not yield the desired result):

spark.sql("select * from main_table where id in (select distinct id from ids_table)")

You could get the same result by using a left JOIN - that's what IN subqueries are generally translated into (for more details on that refer to the aforementioned linked notebook).

For example:

# set up some data
l1 = [('Alice', 1), ('Bob', 2), ('Eve', 3)]
df1 = sql_sc.createDataFrame(l1, ['name', 'id'])

l2 = [(1,), (2,)]
df2 = sql_sc.createDataFrame(l2, ['id'])

df1.createOrReplaceTempView("main_table")
df2.createOrReplaceTempView("ids_table")

# use a left join
spark.sql("select * from main_table m left join ids_table d on (m.id=d.id)") \
    .withColumn('flag', func.when(func.col('d.id').isNull(), 0).otherwise(1)) \ 
    .drop('id').collect()

# result:
[Row(name='Bob', flag=1), Row(name='Eve', flag=0), Row(name='Alice', flag=1)]

Or, using pyspark sql functions rather than sql syntax:

df2 = df2.withColumnRenamed('id', 'id_faux')
df1.join(df2, df1.id == df2.id_faux, how='left') \
     .withColumn('flag', func.when(func.col('id_faux').isNull(), 0).otherwise(1)).drop('id_faux').collect()
like image 174
etov Avatar answered Oct 27 '22 20:10

etov