Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement NOT IN for two DataFrames with different structure in Apache Spark

I am using Apache Spark in my Java application. I have two DataFrames: df1 and df2. The df1 contains Rows with email, firstName and lastName. df2 contains Rows with email.

I want to create a DataFrame: df3 that contains all the rows in df1, which email is not present in df2.

Is there a way to do this with Apache Spark? I tried to create JavaRDD<String> from df1 and df2 by casting them toJavaRDD() and filtering df1 to containing all emails and after that using subtract, but I don't know how to map the new JavaRDD to ds1 and get a DataFrame.

Basically I need all Rows that are in df1 whose email is not in df2.

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer ");

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " +
                            "WHERE product_id = '" + productId + "'");

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0));

List<String> notBoughtEmails = customers.javaRDD()
                        .map(row -> row.getString(0))
                        .subtract(customersBoughtEmail).collect();
like image 465
Ivan Stoyanov Avatar asked Nov 11 '15 13:11

Ivan Stoyanov


People also ask

How do I merge two DataFrames with different columns in Spark?

In PySpark to merge two DataFrames with different columns, will use the similar approach explain above and uses unionByName() transformation. First let's create DataFrame's with different number of columns. Now add missing columns ' state ' and ' salary ' to df1 and ' age ' to df2 with null values.

How can I find the difference between two Spark data frames?

Use the except() to subtract or find the difference between two dataframes.

How do I combine two Spark DataFrames?

To union, we use pyspark module: Dataframe union() – union() method of the DataFrame is employed to mix two DataFrame's of an equivalent structure/schema. If schemas aren't equivalent it returns a mistake. DataFrame unionAll() – unionAll() is deprecated since Spark “2.0.


1 Answers

Spark 2.0.0+

You can use NOT IN directly.

Spark < 2.0.0

It can be expressed using outer join and filter.

val customers = sc.parallelize(Seq(
  ("[email protected]", "John", "Doe"),
  ("[email protected]", "Jane", "Doe")
)).toDF("email", "first_name", "last_name")

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
  Tuple1("[email protected]")
)).toDF("email")

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")),
    $"email" === $"email_", "leftouter")
 .where($"email_".isNull).drop("email_")

customersWhoHaventOrderedTheProduct.show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |[email protected]|      John|      Doe|
// +----------------+----------+---------+

Raw SQL equivalent:

customers.registerTempTable("customers")
customersWhoOrderedTheProduct.registerTempTable(
  "customersWhoOrderedTheProduct")

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN  
                 customersWhoOrderedTheProduct o
               ON c.email = o.email
               WHERE o.email IS NULL"""

sqlContext.sql(query).show

// +----------------+----------+---------+
// |           email|first_name|last_name|
// +----------------+----------+---------+
// |[email protected]|      John|      Doe|
// +----------------+----------+---------+
like image 130
zero323 Avatar answered Nov 14 '22 23:11

zero323