Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is going wrong with `unionAll` of Spark `DataFrame`?

Using Spark 1.5.0 and given the following code, I expect unionAll to union DataFrames based on their column name. In the code, I'm using some FunSuite for passing in SparkContext sc:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}

Output:

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
+---+---+

+---+---+
|  b|  a|
+---+---+
|  5|  3|
|  6|  4|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
|  5|  3|
|  6|  4|
+---+---+

Why does the result contain intermixed "b" and "a" columns, instead of aligning columns bases on column names? Sounds like a serious bug!?

like image 515
Martin Senne Avatar asked Sep 21 '15 21:09

Martin Senne


People also ask

Does union remove duplicates in PySpark?

Note: In other SQL languages, Union eliminates the duplicates but UnionAll merges two datasets including duplicate records. But, in PySpark both behave the same and recommend using DataFrame duplicate() function to remove duplicate rows.

How do you union multiple data frames?

The PySpark union() function is used to combine two or more data frames having the same structure or schema. This function returns an error if the schema of data frames differs from each other. Where, data_frame1 and data_frame2 are the dataframes.

What is the difference between union and union all in PySpark?

UNION and UNION ALL return the rows that are found in either relation. UNION (alternatively, UNION DISTINCT ) takes only distinct rows while UNION ALL does not remove duplicates from the result rows.


3 Answers

It doesn't look like a bug at all. What you see is a standard SQL behavior and every major RDMBS, including PostgreSQL, MySQL, Oracle and MS SQL behaves exactly the same. You'll find SQL Fiddle examples linked with names.

To quote PostgreSQL manual:

In order to calculate the union, intersection, or difference of two queries, the two queries must be "union compatible", which means that they return the same number of columns and the corresponding columns have compatible data types

Column names, excluding the first table in the set operation, are simply ignored.

This behavior comes directly form the Relational Algebra where basic building block is a tuple. Since tuples are ordered an union of two sets of tuples is equivalent (ignoring duplicates handling) to the output you get here.

If you want to match using names you can do something like this

import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.col  def unionByName(a: DataFrame, b: DataFrame): DataFrame = {   val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq   a.select(columns: _*).unionAll(b.select(columns: _*)) } 

To check both names and types it is should be enough to replace columns with:

a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq 
like image 91
zero323 Avatar answered Sep 26 '22 16:09

zero323


This issue is getting fixed in spark2.3. They are adding support of unionByName in the dataset.

https://issues.apache.org/jira/browse/SPARK-21043

like image 29
Avishek Bhattacharya Avatar answered Sep 24 '22 16:09

Avishek Bhattacharya


no issues/bugs - if you observe your case class B very closely then you will be clear. Case Class A --> you have mentioned the order (a,b), and Case Class B --> you have mentioned the order (b,a) ---> this is expected as per order

case class A (a: Int, b: Int) case class B (b: Int, a: Int)

thanks, Subbu

like image 31
SUBBAREDDY JANGALAPALLI Avatar answered Sep 23 '22 16:09

SUBBAREDDY JANGALAPALLI