Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Left Anti join in Spark?

I've defined two tables like this:

 val tableName = "table1"     val tableName2 = "table2"      val format = new SimpleDateFormat("yyyy-MM-dd")       val data = List(         List("mike", 26, true),         List("susan", 26, false),         List("john", 33, true)       )     val data2 = List(         List("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),         List("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),         List("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),         List("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),         List("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime))       )        val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))       val rdd2 = sparkContext.parallelize(data2).map(Row.fromSeq(_))       val schema = StructType(Array(         StructField("name", StringType, true),         StructField("age", IntegerType, true),         StructField("isBoy", BooleanType, false)       ))     val schema2 = StructType(Array(         StructField("name", StringType, true),         StructField("grade", StringType, true),         StructField("howold", IntegerType, true),         StructField("hobby", StringType, true),         StructField("birthday", DateType, false)       ))        val df = sqlContext.createDataFrame(rdd, schema)       val df2 = sqlContext.createDataFrame(rdd2, schema2)       df.createOrReplaceTempView(tableName)       df2.createOrReplaceTempView(tableName2) 

I'm trying to build query to return rows from table1 that doesn't have matching row in table2. I've tried to do it using this query:

Select * from table1 LEFT JOIN table2 ON table1.name = table2.name AND table1.age = table2.howold AND table2.name IS NULL AND table2.howold IS NULL 

but this just gives me all rows from table1:

List({"name":"john","age":33,"isBoy":true}, {"name":"susan","age":26,"isBoy":false}, {"name":"mike","age":26,"isBoy":true})

How to make this type of join in Spark efficiently?

I'm looking for an SQL query because I need to be able to specify columns which to compare between two tables, not just compare row by row like it is done in other recommended questions. Like using subtract, except etc.

like image 621
sergeda Avatar asked Apr 03 '17 14:04

sergeda


People also ask

What is Spark left anti join?

Anti Join. An anti join returns values from the left relation that has no match with the right. It is also referred to as a left anti join.

What is the difference between left join and left anti join?

Anti-Join Example Using Code This is an example of a left anti-join. It is the exact same as a left join except for the WHERE clause. This is what differentiates it from a typical left join. The query above is finding all admissions that did not have a matching physician_id in the physicians table.

Does Hive support left anti join?

Here is a citation from Hive manual: "LEFT SEMI JOIN implements the uncorrelated IN/EXISTS subquery semantics in an efficient way. As of Hive 0.13 the IN/NOT IN/EXISTS/NOT EXISTS operators are supported using subqueries so most of these JOINs don't have to be performed manually anymore.


1 Answers

You can use the "left anti" join type - either with DataFrame API or with SQL (DataFrame API supports everything that SQL supports, including any join condition you need):

DataFrame API:

df.as("table1").join(   df2.as("table2"),   $"table1.name" === $"table2.name" && $"table1.age" === $"table2.howold",   "leftanti" ) 

SQL:

sqlContext.sql(   """SELECT table1.* FROM table1     | LEFT ANTI JOIN table2     | ON table1.name = table2.name AND table1.age = table2.howold   """.stripMargin) 

NOTE: it's also worth noting that there's a shorter, more concise way of creating the sample data without specifying the schema separately, using tuples and the implicit toDF method, and then "fixing" the automatically-inferred schema where needed:

import spark.implicits._ val df = List(   ("mike", 26, true),   ("susan", 26, false),   ("john", 33, true) ).toDF("name", "age", "isBoy")  val df2 = List(   ("mike", "grade1", 45, "baseball", new java.sql.Date(format.parse("1957-12-10").getTime)),   ("john", "grade2", 33, "soccer", new java.sql.Date(format.parse("1978-06-07").getTime)),   ("john", "grade2", 32, "golf", new java.sql.Date(format.parse("1978-06-07").getTime)),   ("mike", "grade2", 26, "basketball", new java.sql.Date(format.parse("1978-06-07").getTime)),   ("lena", "grade2", 23, "baseball", new java.sql.Date(format.parse("1978-06-07").getTime)) ).toDF("name", "grade", "howold", "hobby", "birthday").withColumn("birthday", $"birthday".cast(DateType)) 
like image 65
Tzach Zohar Avatar answered Oct 06 '22 08:10

Tzach Zohar