I am using Apache Spark in my Java application.
I have two DataFrame
s: df1
and df2
. The df1
contains Row
s with email
, firstName
and lastName
. df2
contains Row
s with email
.
I want to create a DataFrame
: df3
that contains all the rows in df1
, which email is not present in df2
.
Is there a way to do this with Apache Spark? I tried to create JavaRDD<String>
from df1
and df2
by casting them toJavaRDD()
and filtering df1
to containing all emails and after that using subtract
, but I don't know how to map the new JavaRDD
to ds1
and get a DataFrame
.
Basically I need all Rows that are in df1
whose email is not in df2
.
DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");
DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
"WHERE product_id = '" + productId + "'");
JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));
List<String> notBoughtEmails = customers.javaRDD()
.map(row -> row.getString(0))
.subtract(customersBoughtEmail).collect();
In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let's create DataFrame's with different number of columns. Now add missing columns ' state ' and ' salary ' to df1 and ' age ' to df2 with null values.
Use the except() to subtract or find the difference between two dataframes.
To union, we use pyspark module: Dataframe union() – union() method of the DataFrame is employed to mix two DataFrame's of an equivalent structure/schema. If schemas aren't equivalent it returns a mistake. DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.
Spark 2.0.0+
You can use NOT IN
directly.
Spark < 2.0.0
It can be expressed using outer join and filter.
val customers = sc.parallelize(Seq(
("[email protected]", "John", "Doe"),
("[email protected]", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")
val customersWhoOrderedTheProduct = sc.parallelize(Seq(
Tuple1("[email protected]")
)).toDF("email")
val customersWhoHaventOrderedTheProduct = customers.join(
customersWhoOrderedTheProduct.select($"email".alias("email_")),
$"email" === $"email_", "leftouter")
.where($"email_".isNull).drop("email_")
customersWhoHaventOrderedTheProduct.show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |[email protected]| John| Doe|
// +----------------+----------+---------+
Raw SQL equivalent:
customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
"customersWhoOrderedTheProduct")
val query = """SELECT c.* FROM customers c LEFT OUTER JOIN
customersWhoOrderedTheProduct o
ON c.email = o.email
WHERE o.email IS NULL"""
sqlContext.sql(query).show
// +----------------+----------+---------+
// | email|first_name|last_name|
// +----------------+----------+---------+
// |[email protected]| John| Doe|
// +----------------+----------+---------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With