I am doing a News recommendation system and I need to build a table for users and news they read. my raw data just like this :
001436800277225 ["9161492","9161787","9378531"]
009092130698762 ["9394697"]
010003000431538 ["9394697","9426473","9428530"]
010156461231357 ["9350394","9414181"]
010216216021063 ["9173862","9247870"]
010720006581483 ["9018786"]
011199797794333 ["9017977","9091134","9142852","9325464","9331913"]
011337201765123 ["9161294","9198693"]
011414545455156 ["9168185","9178348","9182782","9359776"]
011425002581540 ["9083446","9161294","9309432"]
and I use spark-SQL do explode and one hot encoding,
df = getdf()
df1 = df.select('uuid',explode('news').alias('news'))
stringIndexer = StringIndexer(inputCol="news", outputCol="newsIndex")
model = stringIndexer.fit(df1)
indexed = model.transform(df1)
encoder = OneHotEncoder(inputCol="newsIndex", outputCol="newsVec")
encoded = encoder.transform(indexed)
encoded.show(20,False)
After that, my data become:
+---------------+-------+---------+----------------------+
|uuid |news |newsIndex|newsVec |
+---------------+-------+---------+----------------------+
|014324000386050|9398253|10415.0 |(105721,[10415],[1.0])|
|014324000386050|9428530|70.0 |(105721,[70],[1.0]) |
|014324000631752|654112 |1717.0 |(105721,[1717],[1.0]) |
|014324000674240|730531 |2282.0 |(105721,[2282],[1.0]) |
|014324000674240|694306 |1268.0 |(105721,[1268],[1.0]) |
|014324000674240|712016 |4766.0 |(105721,[4766],[1.0]) |
|014324000674240|672307 |7318.0 |(105721,[7318],[1.0]) |
|014324000674240|698073 |1241.0 |(105721,[1241],[1.0]) |
|014324000674240|728044 |5302.0 |(105721,[5302],[1.0]) |
|014324000674240|672256 |1619.0 |(105721,[1619],[1.0]) |
|014324000674240|730236 |2376.0 |(105721,[2376],[1.0]) |
|014324000674240|730235 |14274.0 |(105721,[14274],[1.0])|
|014324000674240|728509 |1743.0 |(105721,[1743],[1.0]) |
|014324000674240|704528 |10310.0 |(105721,[10310],[1.0])|
|014324000715399|774134 |8876.0 |(105721,[8876],[1.0]) |
|014324000725836|9357431|3479.0 |(105721,[3479],[1.0]) |
|014324000725836|9358028|15621.0 |(105721,[15621],[1.0])|
|014324000730349|812106 |4599.0 |(105721,[4599],[1.0]) |
|014324000730349|699237 |754.0 |(105721,[754],[1.0]) |
|014324000730349|748109 |4854.0 |(105721,[4854],[1.0]) |
+---------------+-------+---------+----------------------+
But one id have multiple rows, so I want to groupBy('uuid')
and then add
these vectors. But just use groupBy and then add will have error. How could I do that?
We start by creating a spark dataframe with a column of dense vectors. Next, we create another PySpark udf which changes the dense vector into a PySpark array. Finally, we can use our standard PySpark aggregators to each item in the PySpark array.
Similar to SQL “GROUP BY” clause, Spark sql groupBy () function is used to collect the identical data into groups on DataFrame/Dataset and perform aggregate functions like count (),min (),max,avg (),mean () on the grouped data. Using multiple aggregate functions with groupBy using agg ()
Use Sparse Columns. Sparse columns are ordinary columns that have an optimized storage for null values. Sparse columns reduce the space requirements for null values at the cost of more overhead to retrieve nonnull values. Consider using sparse columns when the space saved is at least 20 percent to 40 percent.
For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users can pass in SciPy’s scipy.sparse column vectors. Create a dense vector of 64-bit floats from a Python list or numbers. Find norm of the given vector.
Starting from indexed
, we can collect the column newsIndex
as a list and transform it into a SparseVector
using an udf
.
To declare a sparse vector, we need the number of features and a list of tuples containing the position and the value. Because we are dealing with a categorical variable, for value we will use is 1.0
. And the index will be the column newsIndex
:
from pyspark.sql.functions import collect_list, max, lit
from pyspark.ml.linalg import Vectors, VectorUDT
def encode(arr, length):
vec_args = length, [(x,1.0) for x in arr]
return Vectors.sparse(*vec_args)
encode_udf = udf(encode, VectorUDT())
The number of features is max(newsIndex) + 1
(since StrinIndexer
begins at 0.0
):
feats = indexed.agg(max(indexed["newsIndex"])).take(1)[0][0] + 1
Bringing it all together:
indexed.groupBy("uuid") \
.agg(collect_list("newsIndex")
.alias("newsArr")) \
.select("uuid",
encode_udf("newsArr", lit(feats))
.alias("OHE")) \
.show(truncate = False)
+---------------+-----------------------------------------+
|uuid |OHE |
+---------------+-----------------------------------------+
|009092130698762|(24,[0],[1.0]) |
|010003000431538|(24,[0,3,15],[1.0,1.0,1.0]) |
|010720006581483|(24,[11],[1.0]) |
|010216216021063|(24,[10,22],[1.0,1.0]) |
|001436800277225|(24,[2,12,23],[1.0,1.0,1.0]) |
|011425002581540|(24,[1,5,9],[1.0,1.0,1.0]) |
|010156461231357|(24,[13,18],[1.0,1.0]) |
|011199797794333|(24,[7,8,17,19,20],[1.0,1.0,1.0,1.0,1.0])|
|011414545455156|(24,[4,6,14,21],[1.0,1.0,1.0,1.0]) |
|011337201765123|(24,[1,16],[1.0,1.0]) |
+---------------+-----------------------------------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With