Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to normalize and create similarity matrix in Pyspark?

I have seen many stack overflow questions about similarity matrix but they deal with RDD or other cases and I could not find the direct answer to my problem and I decided to post a new question.

Problem

import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import functions as F, Window
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler,Normalizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

# pandas dataframe
pdf = pd.DataFrame({'user_id': ['user_0','user_1','user_2'],
                   'apple': [0,1,5],
                   'good banana': [3,0,1],
                   'carrot': [1,2,2]})
# spark dataframe
df = sqlContext.createDataFrame(pdf)
df.show()

+-------+-----+-----------+------+
|user_id|apple|good banana|carrot|
+-------+-----+-----------+------+
| user_0|    0|          3|     1|
| user_1|    1|          0|     2|
| user_2|    5|          1|     2|
+-------+-----+-----------+------+

Normalize and create Similarity Matrix using Pandas

from sklearn.preprocessing import normalize

pdf = pdf.set_index('user_id')
item_norm = normalize(pdf,axis=0) # normalize each items (NOT users)
item_sim = item_norm.T.dot(item_norm)
df_item_sim = pd.DataFrame(item_sim,index=pdf.columns,columns=pdf.columns)

                apple  good banana    carrot
apple        1.000000     0.310087  0.784465
good banana  0.310087     1.000000  0.527046
carrot       0.784465     0.527046  1.000000

Question: how to get the similarity matrix like above using PySpark?

I want to run KMeans on that data.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# I want to do this...
model = KMeans(k=2, seed=1).fit(df.select('norm_features'))

df = model.transform(df)
df.show()

References

  • Cosine Similarity for two pyspark dataframes
  • Apache Spark Python Cosine Similarity over DataFrames
like image 881
BhishanPoudel Avatar asked Feb 24 '21 21:02

BhishanPoudel


People also ask

How do you normalize a vector in MATLAB?

For any 1 <= p < float (‘inf’), normalizes samples using sum (abs (vector) p) (1/p) as norm. For p = float (‘inf’), max (abs (vector)) will be used as norm for normalization.

How to load data from pyspark to spark dataframe?

To load the data as a spark dataframe, import pyspark and instantiate a spark session. Movie lens data with explicit ratings given by the user for movies watched. Let's calculate the data sparsity to understand the sparsity of the data. Please function that we built in the beginning of this article to get the sparsity.

How do you normalize samples in MLlib?

class pyspark.mllib.feature.Normalizer(p=2.0) [source] ¶ Normalizes samples individually to unit L p norm For any 1 <= p < float (‘inf’), normalizes samples using sum (abs (vector) p) (1/p) as norm. For p = float (‘inf’), max (abs (vector)) will be used as norm for normalization.

How to normalize samples in L^P^ space?

For any 1 <= p < float (‘inf’), normalizes samples using sum (abs (vector) p) (1/p) as norm. For p = float (‘inf’), max (abs (vector)) will be used as norm for normalization. New in version 1.2.0. Normalization in L^p^ space, p = 2 by default.


1 Answers

import pyspark.sql.functions as F

df.show()
+-------+-----+-----------+------+
|user_id|apple|good banana|carrot|
+-------+-----+-----------+------+
| user_0|    0|          3|     1|
| user_1|    1|          0|     2|
| user_2|    5|          1|     2|
+-------+-----+-----------+------+

Swap rows and columns by unpivoting and pivoting:

df2 = df.selectExpr(
    'user_id',
    'stack(3, ' + ', '.join(["'%s', `%s`" % (c, c) for c in df.columns[1:]]) + ') as (fruit, items)'
).groupBy('fruit').pivot('user_id').agg(F.first('items'))

df2.show()
+-----------+------+------+------+
|      fruit|user_0|user_1|user_2|
+-----------+------+------+------+
|      apple|     0|     1|     5|
|good banana|     3|     0|     1|
|     carrot|     1|     2|     2|
+-----------+------+------+------+

Normalize:

df3 = df2.select(
    'fruit',
    *[
        (
            F.col(c) / 
            F.sqrt(
                sum([F.col(cc)*F.col(cc) for cc in df2.columns[1:]])
            )
        ).alias(c) for c in df2.columns[1:]
    ]
)

df3.show()
+-----------+------------------+-------------------+-------------------+
|      fruit|            user_0|             user_1|             user_2|
+-----------+------------------+-------------------+-------------------+
|      apple|               0.0|0.19611613513818404| 0.9805806756909202|
|good banana|0.9486832980505138|                0.0|0.31622776601683794|
|     carrot|0.3333333333333333| 0.6666666666666666| 0.6666666666666666|
+-----------+------------------+-------------------+-------------------+

Do the matrix multiplication:

df4 = (df3.alias('t1').repartition(10)
          .crossJoin(df3.alias('t2').repartition(10))
          .groupBy('t1.fruit')
          .pivot('t2.fruit', df.columns[1:])
          .agg(F.first(sum([F.col('t1.'+c) * F.col('t2.'+c) for c in df3.columns[1:]])))
      )
df4.show()
+-----------+-------------------+-------------------+------------------+
|      fruit|              apple|        good banana|            carrot|
+-----------+-------------------+-------------------+------------------+
|      apple| 1.0000000000000002|0.31008683647302115|0.7844645405527362|
|good banana|0.31008683647302115| 0.9999999999999999|0.5270462766947298|
|     carrot| 0.7844645405527362| 0.5270462766947298|               1.0|
+-----------+-------------------+-------------------+------------------+
like image 141
mck Avatar answered Oct 25 '22 15:10

mck