Banging my head a little with this one, and I suspect the answer is very simple. Given two dataframes, I want to filter the first where values in one column are not present in a column of another dataframe.
I would like to do this without resorting to full-blown Spark SQL, so just using DataFrame.filter, or Column.contains or the "isin" keyword, or one of the join methods.
val df1 = Seq(("Hampstead", "London"),
("Spui", "Amsterdam"),
("Chittagong", "Chennai")).toDF("location", "city")
val df2 = Seq(("London"),("Amsterdam"), ("New York")).toDF("cities")
val res = df1.filter(df2("cities").contains("city") === false)
// doesn't work, nor do the 20 other variants I have tried
Anyone got any ideas?
First, we need to modify the original DataFrame to add the row with data [3, 10]. Perform a left-join, eliminating duplicates in df2 so that each row of df1 joins with exactly 1 row of df2 . Use the parameter indicator to return an extra column indicating which table the row was from.
I've discovered that I can solve this using a simpler method - it seems that an antijoin is possible as a parameter to the join method, but the Spark Scaladoc does not describe it:
import org.apache.spark.sql.functions._
val df1 = Seq(("Hampstead", "London"),
("Spui", "Amsterdam"),
("Chittagong", "Chennai")).toDF("location", "city")
val df2 = Seq(("London"),("Amsterdam"), ("New York")).toDF("cities")
df1.join(df2, df1("city") === df2("cities"), "leftanti").show
Results in:
+----------+-------+
| location| city|
+----------+-------+
|Chittagong|Chennai|
+----------+-------+
P.S. thanks for the pointer to the duplicate - duly marked as such
If you are trying to filter a DataFrame
using another, you should use join
(or any of its variants). If what you need is to filter it using a List
or any data structure that fits in your master and workers you could broadcast it, then reference it inside the filter
or where
method.
For instance I would do something like:
import org.apache.spark.sql.functions._
val df1 = Seq(("Hampstead", "London"),
("Spui", "Amsterdam"),
("Chittagong", "Chennai")).toDF("location", "city")
val df2 = Seq(("London"),("Amsterdam"), ("New York")).toDF("cities")
df2.join(df1, joinExprs=df1("city") === df2("cities"), joinType="full_outer")
.select("city", "cities")
.where(isnull($"cities"))
.drop("cities").show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With