I have 2 large DataFrames to be merged based on a association key. Using join
takes a longer time to complete the task.
I see that using cogroup
is prefered over Joins in Apache Spark. Can anyone point on how to use cogroup
on DataFrames or suggest a better approach for merging 2 large DataFrames.
Thank you
Spark >= 3.0
Since 3.0 Spark provides PySpark-specific cogroup
using Pandas / Arrow. General syntax is as follows:
left.cogroup(right).apply(f)
where both
and right
are GroupedData
objects and f
is a COGROUPED_MAP
User Defined Function that takes two Pandas DataFrames
and returns Pandas DataFrame
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas.core.frame import DataFrame as PandasDataFrame
@pandas_udf(schema)
def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...
Spark >= 1.6
JVM KeyValueGroupedDataset
provides both Java
def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R]
and Scala
def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ⇒ TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R]
It is however intended for "strongly" typed variants, not Dataset[Row]
, and is highly unlikely to contribute to your declared goal (performance improvement).
Spark < 1.6 (this part stays valid onward, with exception to small API additions listed above).
DataFrame
doesn't provide any equivalent of cogroup
function and complex objects are not the first class citizens in the Spark SQL. A set of operations available on complex structures is rather limited so typically you have to either create custom expression what is not trivial or use UDFs and pay a performance penalty. Moreover Spark SQL doesn't use the same join
logic as plain RDDs
.
Regarding RDDs. While there exist border cases where cogroup
can be favorable over join
but typically it shouldn't be the case unless the results -> Cartesian product of complete dataset. After all joins on RDDs are expressed using cogroup
followed by flatMapValues
and since the latter operation is local the only real overhead is creation of the output tuples.
If your tables contain only primitive types you could mimic co-group like behavior by aggregating columns with collect_list
first but I wouldn't expect any performance gains here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With