Is it possible to Join two RDDs in Spark on a custom function? I have two big RDDs with a string as key. I want to join them not using the classic Join but a custom function like:
def my_func(a,b):
    return Lev.distance(a,b) < 2
result_rdd = rdd1.join(rdd2, my_func)
If it's not possible, is there any alternative that will continue to use the benefits of spark clusters? I wrote something like this but pyspark will not be able to distribuite the work on my small cluster.
def custom_join(rdd1, rdd2, my_func):
    a = rdd1.sortByKey().collect()
    b = rdd2.sortByKey().collect()
    i = 0
    j = 0
    res = []
    while i < len(a) and j < len(b):
        if my_func(a[i][0],b[j][0]):
            res += [((a[i][0],b[j][0]),(a[i][1],b[j][1]))]
            i+=1
            j+=1
        elif a[i][0] < b[j][0]:
            i+=1
        else:
            j+=1
    return sc.parallelize(res)
Thanks in advance (and sorry for my english because I'm italian)
You can use cartesian and then filter based on conditions.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("b", 3)])
def customFunc(x):
    # You may use any condition here
    return x[0][0] ==x[1][0]
print(x.join(y).collect()) # normal join
# replicating join with cartesian
print(x.cartesian(y).filter(customFunc).flatMap(lambda x:x).groupByKey().mapValues(tuple).collect())
Output:
[('b', (4, 3)), ('a', (1, 2))]
[('a', (1, 2)), ('b', (4, 3))]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With