Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fuzzy join between two large datasets in Spark

I need to do a fuzzy join between two large dataset (assuming 30Gb for each dataset) based on the similarity of two columns of string. For example:

Table 1:

Key1  |Value1
-------------
1     |qsdm fkq jmsk fqj msdk

Table 2:

Key2  |Value2
-------------
1     |qsdm fkqj mskf qjm sdk

We aims to calculate the cosine of similarity between each row of value1 with each row of value2, after that, thank to a thresold predefined, I can join two tables.

Key words: Entity resolution, cosine of similarity, inverted indices (to optimize the calculation of similarity), TF-IDF, token weight, words, document (a cell in value column), dataset

I use Spark (PySpark) for computing the join. At a moment of process, I have:

  • a RDD RDD1 of (key1, dict1): key1 is the key of table1, dict1 is a dictionary of word and its weight over dataset table1 (vector of weight)
  • a RDD RDD2 of (key2, dict2): key2 is the key of table2, dict2 is a dictionary of word and its weight over dataset table2 (vector of weight)
  • a RDD NORM1 of (key1, norm1): key1 is the key of table1, norm1 is a value pre-computed over dict1
  • a RDD NORM2 of (key2, norm2): key2 is the key of table2, norm2 is a value pre-computed over dict2

Using the strategy of inverted indices, I have reduced the number of computation about similarity between two documents (string). It's an array of RDD CommonTokens((key1, key2), tokens): key1 is key in table1, key2 is key in table2, tokens is a list of common tokens between value1 and value2. For each element in CommonTokens, I compute the cosine of similarity to generate ((key1, key2), similarity).

In spark, I did:

  • collectAsMap RDD1, NORM1, RDD2, NORM2 to build 4 dictionaries
  • create a function similarity:

    • input: (key1, key2, commonTokens)
    • lookup key1 in RDD1 and NORM1, key2 in RDD2 and NORM2
    • Calculate the cosin
    • return (key1, key2, similarity)
  • Apply the map in CommonTokens with similarity function defined above

  • Configuration to submit my job to YARN:

spark-submit --master yarn-client --executor-cores 3 --executor-memory 20G --driver-memory 20G --driver-cores 12 --queue cku --num-executors 6 run/Join.py &

Problem in spark:

  • a lot of CollectAsMap ==> overload the driver ==> Deadlock
  • can not do a RDD transformation inside another RDD transformation (instead of using collectAsMap, use directly RDD1, RDD2, NORM1, NORM2 to lookup key1, key2 inside CommonTokens.Map)
  • I tried to "convert" RDD1, RDD2, NORM1, NORM2 to dataframes and use Spark SQL to "select" (lookup) but it was not working inside the map
  • a bonus question is if my algorithm was efficient for my case?

Thanks for any suggestion (Sorry for my english, feel free to ask me for further information if my question is not clear)

like image 560
minh-hieu.pham Avatar asked Mar 01 '26 06:03

minh-hieu.pham


1 Answers

You likely want to look into Local Sensitivity Hashing. Fortunately spark has already done the work for you. This will reduce the number of computations, and give you the euclidean difference between the two vectors. (Euclidean vs Cosine). The only real warning I'd give you would have to make all the vectors the same length, but it seems it would give you what you want with less work.

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
like image 100
Matt Andruff Avatar answered Mar 04 '26 06:03

Matt Andruff



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!