I have a dataset containing workers with their demographic information like age gender,address etc and their work locations. I created an RDD from the dataset and converted it into a DataFrame.
There are multiple entries for each ID. Hence, I created a DataFrame which contained only the ID of the worker and the various office locations' that he/she had worked.
|----------|----------------|
| **ID** **Office_Loc** |
|----------|----------------|
| 1 |Delhi, Mumbai, |
| | Gandhinagar |
|---------------------------|
| 2 | Delhi, Mandi |
|---------------------------|
| 3 |Hyderbad, Jaipur|
-----------------------------
I want to calculate the cosine similarity between each worker with every other worker based on their office locations'.
So, I iterated through the rows of the DataFrame, retrieving a single row from the DataFrame :
myIndex = 1
values = (ID_place_df.rdd.zipWithIndex()
.filter(lambda ((l, v), i): i == myIndex)
.map(lambda ((l,v), i): (l, v))
.collect())
and then using map
cos_weight = ID_place_df.select("ID","office_location").rdd\
.map(lambda x: get_cosine(values,x[0],x[1]))
to calculated the cosine similarity between the extracted row and the whole DataFrame.
I do not think my approach is a good one since I am iterating through the rows of the DataFrame, it defeats the whole purpose of using spark. Is there a better way to do it in pyspark? Kindly advise.
Tf-idf is a transformation you apply to texts to get two real-valued vectors. You can then obtain the cosine similarity of any pair of vectors by taking their dot product and dividing that by the product of their norms. That yields the cosine of the angle between the vectors.
We use the below formula to compute the cosine similarity. where A and B are vectors: A.B is dot product of A and B: It is computed as sum of element-wise product of A and B. ||A|| is L2 norm of A: It is computed as square root of the sum of squares of elements of the vector A.
Cosine similarity measures the similarity between two vectors of an inner product space. It is measured by the cosine of the angle between two vectors and determines whether two vectors are pointing in roughly the same direction.
You can use the mllib
package to compute the L2
norm of the TF-IDF of every row. Then multiply the table with itself to get the cosine similarity as the dot product of two by two L2
norms:
1. RDD
rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]])
Compute TF-IDF
:
documents = rdd.map(lambda l: l[1].replace(" ", "").split(","))
from pyspark.mllib.feature import HashingTF, IDF
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
You can specify the number of features in HashingTF
to make the feature matrix smaller (fewer columns).
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
Compute L2
norm:
from pyspark.mllib.feature import Normalizer
labels = rdd.map(lambda l: l[0])
features = tfidf
normalizer = Normalizer()
data = labels.zip(normalizer.transform(features))
Compute cosine similarity by multiplying the matrix with itself:
from pyspark.mllib.linalg.distributed import IndexedRowMatrix
mat = IndexedRowMatrix(data).toBlockMatrix()
dot = mat.multiply(mat.transpose())
dot.toLocalMatrix().toArray()
array([[ 0. , 0. , 0. , 0. ],
[ 0. , 1. , 0.10794634, 0. ],
[ 0. , 0.10794634, 1. , 0. ],
[ 0. , 0. , 0. , 1. ]])
OR: Using a Cartesian product and the function dot
on numpy arrays:
data.cartesian(data)\
.map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
.sortByKey()\
.collect()
[((1, 1), 1.0),
((1, 2), 0.10794633570596117),
((1, 3), 0.0),
((2, 1), 0.10794633570596117),
((2, 2), 1.0),
((2, 3), 0.0),
((3, 1), 0.0),
((3, 2), 0.0),
((3, 3), 1.0)]
2. DataFrame
Since you seem to be using dataframes, you can use the spark ml
package instead:
import pyspark.sql.functions as psf
df = rdd.toDF(["ID", "Office_Loc"])\
.withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))
Compute TF-IDF:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf")
tf = hashingTF.transform(df)
idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
tfidf = idf.transform(tf)
Compute L2
norm:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="feature", outputCol="norm")
data = normalizer.transform(tfidf)
Compute matrix product:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
mat = IndexedRowMatrix(
data.select("ID", "norm")\
.rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
dot = mat.multiply(mat.transpose())
dot.toLocalMatrix().toArray()
OR: using a join and a UDF
for function dot
:
dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\
.select(
psf.col("i.ID").alias("i"),
psf.col("j.ID").alias("j"),
dot_udf("i.norm", "j.norm").alias("dot"))\
.sort("i", "j")\
.show()
+---+---+-------------------+
| i| j| dot|
+---+---+-------------------+
| 1| 2|0.10794633570596117|
| 1| 3| 0.0|
| 2| 3| 0.0|
+---+---+-------------------+
This tutorial lists different methods to multiply large scale matrices: https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e
About this issue, due to the fact that I'm working in a project with pyspark where I have to use cosine similarity, I have to say that the code of @MaFF is correct, indeed, I hesitated when I see his code, due to the fact he was using the dot product of the vectors' L2 Norm, and the theroy says: Mathematically, it is the ratio of the dot product of the vectors and the product of the magnitude of the two vectors.
And here is my code adapted with the same results, so I came to the conclusion that SKLearn caculates tfidf in a different way, so if you try to replay this excersice using sklearn, you will get a different result.
d = [{'id': '1', 'office': 'Delhi, Mumbai, Gandhinagar'}, {'id': '2', 'office': 'Delhi, Mandi'}, {'id': '3', 'office': 'Hyderbad, Jaipur'}]
df_fussion = spark.createDataFrame(d)
df_fussion = df_fussion.withColumn('office', F.split('office', ', '))
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="office", outputCol="tf")
tf = hashingTF.transform(df_fussion)
idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
data = idf.transform(tf)
@udf
def sim_cos(v1,v2):
try:
p = 2
return float(v1.dot(v2))/float(v1.norm(p)*v2.norm(p))
except:
return 0
result = data.alias("i").join(data.alias("j"), F.col("i.ID") < F.col("j.ID"))\
.select(
F.col("i.ID").alias("i"),
F.col("j.ID").alias("j"),
sim_cos("i.feature", "j.feature").alias("sim_cosine"))\
.sort("i", "j")
result.show()
I also want to share with you some simply test that I did with simply vectors where the results are corrects:
Kind regards,
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With