Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cogroup on Spark DataFrames

I have 2 large DataFrames to be merged based on a association key. Using join takes a longer time to complete the task.

I see that using cogroup is prefered over Joins in Apache Spark. Can anyone point on how to use cogroup on DataFrames or suggest a better approach for merging 2 large DataFrames.

Thank you

like image 479
Kazhiyur Avatar asked Oct 20 '25 01:10

Kazhiyur


1 Answers

Spark >= 3.0

Since 3.0 Spark provides PySpark-specific cogroup using Pandas / Arrow. General syntax is as follows:

left.cogroup(right).apply(f)

where both and right are GroupedData objects and f is a COGROUPED_MAP User Defined Function that takes two Pandas DataFrames and returns Pandas DataFrame

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas.core.frame import DataFrame as PandasDataFrame

@pandas_udf(schema)
def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...

Spark >= 1.6

JVM KeyValueGroupedDataset provides both Java

def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R] 

and Scala

def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ⇒ TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R] 

It is however intended for "strongly" typed variants, not Dataset[Row], and is highly unlikely to contribute to your declared goal (performance improvement).

Spark < 1.6 (this part stays valid onward, with exception to small API additions listed above).

DataFrame doesn't provide any equivalent of cogroup function and complex objects are not the first class citizens in the Spark SQL. A set of operations available on complex structures is rather limited so typically you have to either create custom expression what is not trivial or use UDFs and pay a performance penalty. Moreover Spark SQL doesn't use the same join logic as plain RDDs.

Regarding RDDs. While there exist border cases where cogroup can be favorable over join but typically it shouldn't be the case unless the results -> Cartesian product of complete dataset. After all joins on RDDs are expressed using cogroup followed by flatMapValues and since the latter operation is local the only real overhead is creation of the output tuples.

If your tables contain only primitive types you could mimic co-group like behavior by aggregating columns with collect_list first but I wouldn't expect any performance gains here.

like image 195
zero323 Avatar answered Oct 22 '25 05:10

zero323



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!