Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to zip two (or more) DataFrame in Spark

I have two DataFrame a and b. a is like

Column 1 | Column 2
abc      |  123
cde      |  23 

b is like

Column 1 
1      
2      

I want to zip a and b (or even more) DataFrames which becomes something like:

Column 1 | Column 2 | Column 3
abc      |  123     |   1
cde      |  23      |   2

How can I do it?

like image 656
worldterminator Avatar asked Oct 01 '15 08:10

worldterminator


People also ask

How do you zip two columns in PySpark?

We iterate over the items in column_names and column_values to create a list of the pairs, and then use list(chain. from_iterable(...)) to flatten the list. After the list is made, you can select the field by name. This assumes that there will always be at least 2 elements in each array.

Which is faster DataFrame or Dataset in Spark?

RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. It provides an easy API to perform aggregation operations. It performs aggregation faster than both RDDs and Datasets. Dataset is faster than RDDs but a bit slower than Dataframes.

Is Spark DataFrame faster than pandas DataFrame?

Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.


2 Answers

Operation like this is not supported by a DataFrame API. It is possible to zip two RDDs but to make it work you have to match both number of partitions and number of elements per partition. Assuming this is the case:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}

val a: DataFrame = sc.parallelize(Seq(
  ("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")

// Merge rows
val rows = a.rdd.zip(b.rdd).map{
  case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}

// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)

// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)

If above conditions are not met the only option that comes to mind is adding an index and join:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)

// Join and clean
val ab = aWithIndex
  .join(bWithIndex, Seq("_index"))
  .drop("_index")
like image 52
zero323 Avatar answered Sep 18 '22 08:09

zero323


In Scala's implementation of Dataframes, there is no simple way to concatenate two dataframes into one. We can simply work around this limitation by adding indices to each row of the dataframes. Then, we can do a inner join by these indices. This is my stub code of this implementation:

val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId)

val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3")
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId)

aWithId.join(bWithId, "id")

A little light reading - Check out how Python does this!

like image 28
Sohum Sachdev Avatar answered Sep 20 '22 08:09

Sohum Sachdev