I have seen many stack overflow questions about similarity matrix but they deal with RDD or other cases and I could not find the direct answer to my problem and I decided to post a new question.
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import functions as F, Window
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler,Normalizer
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
spark = pyspark.sql.SparkSession.builder.appName('app').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
# pandas dataframe
pdf = pd.DataFrame({'user_id': ['user_0','user_1','user_2'],
'apple': [0,1,5],
'good banana': [3,0,1],
'carrot': [1,2,2]})
# spark dataframe
df = sqlContext.createDataFrame(pdf)
df.show()
+-------+-----+-----------+------+
|user_id|apple|good banana|carrot|
+-------+-----+-----------+------+
| user_0| 0| 3| 1|
| user_1| 1| 0| 2|
| user_2| 5| 1| 2|
+-------+-----+-----------+------+
from sklearn.preprocessing import normalize
pdf = pdf.set_index('user_id')
item_norm = normalize(pdf,axis=0) # normalize each items (NOT users)
item_sim = item_norm.T.dot(item_norm)
df_item_sim = pd.DataFrame(item_sim,index=pdf.columns,columns=pdf.columns)
apple good banana carrot
apple 1.000000 0.310087 0.784465
good banana 0.310087 1.000000 0.527046
carrot 0.784465 0.527046 1.000000
I want to run KMeans on that data.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
# I want to do this...
model = KMeans(k=2, seed=1).fit(df.select('norm_features'))
df = model.transform(df)
df.show()
References
For any 1 <= p < float (‘inf’), normalizes samples using sum (abs (vector) p) (1/p) as norm. For p = float (‘inf’), max (abs (vector)) will be used as norm for normalization.
To load the data as a spark dataframe, import pyspark and instantiate a spark session. Movie lens data with explicit ratings given by the user for movies watched. Let's calculate the data sparsity to understand the sparsity of the data. Please function that we built in the beginning of this article to get the sparsity.
class pyspark.mllib.feature.Normalizer(p=2.0) [source] ¶ Normalizes samples individually to unit L p norm For any 1 <= p < float (‘inf’), normalizes samples using sum (abs (vector) p) (1/p) as norm. For p = float (‘inf’), max (abs (vector)) will be used as norm for normalization.
For any 1 <= p < float (‘inf’), normalizes samples using sum (abs (vector) p) (1/p) as norm. For p = float (‘inf’), max (abs (vector)) will be used as norm for normalization. New in version 1.2.0. Normalization in L^p^ space, p = 2 by default.
import pyspark.sql.functions as F
df.show()
+-------+-----+-----------+------+
|user_id|apple|good banana|carrot|
+-------+-----+-----------+------+
| user_0| 0| 3| 1|
| user_1| 1| 0| 2|
| user_2| 5| 1| 2|
+-------+-----+-----------+------+
Swap rows and columns by unpivoting and pivoting:
df2 = df.selectExpr(
'user_id',
'stack(3, ' + ', '.join(["'%s', `%s`" % (c, c) for c in df.columns[1:]]) + ') as (fruit, items)'
).groupBy('fruit').pivot('user_id').agg(F.first('items'))
df2.show()
+-----------+------+------+------+
| fruit|user_0|user_1|user_2|
+-----------+------+------+------+
| apple| 0| 1| 5|
|good banana| 3| 0| 1|
| carrot| 1| 2| 2|
+-----------+------+------+------+
Normalize:
df3 = df2.select(
'fruit',
*[
(
F.col(c) /
F.sqrt(
sum([F.col(cc)*F.col(cc) for cc in df2.columns[1:]])
)
).alias(c) for c in df2.columns[1:]
]
)
df3.show()
+-----------+------------------+-------------------+-------------------+
| fruit| user_0| user_1| user_2|
+-----------+------------------+-------------------+-------------------+
| apple| 0.0|0.19611613513818404| 0.9805806756909202|
|good banana|0.9486832980505138| 0.0|0.31622776601683794|
| carrot|0.3333333333333333| 0.6666666666666666| 0.6666666666666666|
+-----------+------------------+-------------------+-------------------+
Do the matrix multiplication:
df4 = (df3.alias('t1').repartition(10)
.crossJoin(df3.alias('t2').repartition(10))
.groupBy('t1.fruit')
.pivot('t2.fruit', df.columns[1:])
.agg(F.first(sum([F.col('t1.'+c) * F.col('t2.'+c) for c in df3.columns[1:]])))
)
df4.show()
+-----------+-------------------+-------------------+------------------+
| fruit| apple| good banana| carrot|
+-----------+-------------------+-------------------+------------------+
| apple| 1.0000000000000002|0.31008683647302115|0.7844645405527362|
|good banana|0.31008683647302115| 0.9999999999999999|0.5270462766947298|
| carrot| 0.7844645405527362| 0.5270462766947298| 1.0|
+-----------+-------------------+-------------------+------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With