I have two DataFrame
a
and b
.
a
is like
Column 1 | Column 2
abc | 123
cde | 23
b
is like
Column 1
1
2
I want to zip a
and b
(or even more) DataFrames which becomes something like:
Column 1 | Column 2 | Column 3
abc | 123 | 1
cde | 23 | 2
How can I do it?
We iterate over the items in column_names and column_values to create a list of the pairs, and then use list(chain. from_iterable(...)) to flatten the list. After the list is made, you can select the field by name. This assumes that there will always be at least 2 elements in each array.
RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. It provides an easy API to perform aggregation operations. It performs aggregation faster than both RDDs and Datasets. Dataset is faster than RDDs but a bit slower than Dataframes.
Due to parallel execution on all cores on multiple machines, PySpark runs operations faster than Pandas, hence we often required to covert Pandas DataFrame to PySpark (Spark with Python) for better performance. This is one of the major differences between Pandas vs PySpark DataFrame.
Operation like this is not supported by a DataFrame API. It is possible to zip
two RDDs but to make it work you have to match both number of partitions and number of elements per partition. Assuming this is the case:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}
val a: DataFrame = sc.parallelize(Seq(
("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")
// Merge rows
val rows = a.rdd.zip(b.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)
// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)
If above conditions are not met the only option that comes to mind is adding an index and join:
def addIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add index
df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
// Create schema
StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)
// Join and clean
val ab = aWithIndex
.join(bWithIndex, Seq("_index"))
.drop("_index")
In Scala's implementation of Dataframes, there is no simple way to concatenate two dataframes into one. We can simply work around this limitation by adding indices to each row of the dataframes. Then, we can do a inner join by these indices. This is my stub code of this implementation:
val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId)
val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3")
val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId)
aWithId.join(bWithId, "id")
A little light reading - Check out how Python does this!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With