I have a large dataset of string ids, that can fit into memory on a single node in my spark cluster. The issue is that it consumes most of the memory for a single node.
These ids are about 30 characters long. For example:
ids
O2LWk4MAbcrOCWo3IVM0GInelSXfcG
HbDckDXCye20kwu0gfeGpLGWnJ2yif
o43xSMBUJLOKDxkYEQbAEWk4aPQHkm
I am looking to write to file a list of all of the pairs of ids. For example:
id1,id2
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,HbDckDXCye20kwu0gfeGpLGWnJ2yif
O2LWk4MAbcrOCWo3IVM0GInelSXfcG,o43xSMBUJLOKDxkYEQbAEWk4aPQHkm
HbDckDXCye20kwu0gfeGpLGWnJ2yif,O2LWk4MAbcrOCWo3IVM0GInelSXfcG
# etc...
So I need to cross join the dataset on itself. I was hoping to do this on PySpark using a 10 node cluster, but it needs to be memory efficient.
To make the computation faster, reduce the number of partitions of the input DataFrames before the cross join, so that the resulting cross joined DataFrame doesn't have too many partitions.
To avoid Cartesian product, a SQL query that joins N tables must have N-1 join conditions. Join condition is missing or trivial. Into each of your Spark driver application code.
In Spark, the Cartesian function generates a Cartesian product of two datasets and returns all the possible combination of pairs. Here, each element of one dataset is paired with each element of another dataset.
pySpark will handle your dataset easily and memory efficient but it will take time to process 10^8 * 10^8 records (this is estimated size of cross join result). See sample code:
from pyspark.sql.types import *
df = spark.read.csv('input.csv', header=True, schema=StructType([StructField('id', StringType())]))
df.withColumnRenamed('id', 'id1').crossJoin(df.withColumnRenamed('id', 'id2')).show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With