I'd like to find an efficient method to create spare vectors in PySpark using dataframes.
Let's say given the transactional input:
df = spark.createDataFrame([
    (0, "a"),
    (1, "a"),
    (1, "b"),
    (1, "c"),
    (2, "a"),
    (2, "b"),
    (2, "b"),
    (2, "b"),
    (2, "c"),
    (0, "a"),
    (1, "b"),
    (1, "b"),
    (2, "cc"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])
+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       a|
|  1|       b|
|  1|       c|
|  2|       a|
|  2|       b|
|  2|       b|
|  2|       b|
|  2|       c|
|  0|       a|
|  1|       b|
|  1|       b|
|  2|      cc|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+
In a summed up format:
df.groupBy(df["id"],df["category"]).count().show()
+---+--------+-----+
| id|category|count|
+---+--------+-----+
|  1|       b|    3|
|  1|       a|    1|
|  1|       c|    1|
|  2|      cc|    1|
|  2|       c|    1|
|  2|       a|    1|
|  1|       a|    1|
|  0|       a|    2|
+---+--------+-----+
My aim is to get this output by id:
+---+-----------------------------------------------+
| id|                                       feature |
+---+-----------------------------------------------+
|  2|SparseVector({a: 1.0, b: 3.0, c: 1.0, cc: 1.0})|
Could you please point me in the right direction? With mapreduce in Java it seemed to be way easier for me.
A sparse vector is used for storing non-zero entries for saving space. It has two parallel arrays: One for indices. The other for values.
A sparse vector is a vector having a relatively small number of nonzero elements.
Each sparse vector will consist of a number of index-value pairs, where the first number in each pair is an integer representing the index (location), and the second number is a floating-point number representing the actual value. You may assume all index locations are non-negative.
This can be done pretty easily with pivot and VectorAssembler. Replace aggregation with pivot:
 pivoted = df.groupBy("id").pivot("category").count().na.fill(0)
and assemble:
from pyspark.ml.feature import VectorAssembler
input_cols = [x for x in pivoted.columns if x != id]
result = (VectorAssembler(inputCols=input_cols, outputCol="features")
    .transform(pivoted)
    .select("id", "features"))
with the result being as follows. This will choose more efficient representation depending on sparsity:
+---+---------------------+
|id |features             |
+---+---------------------+
|0  |(5,[1],[2.0])        |
|5  |(5,[0,3],[5.0,1.0])  |
|1  |[1.0,1.0,3.0,1.0,0.0]|
|3  |(5,[0,1],[3.0,1.0])  |
|2  |[2.0,1.0,3.0,1.0,1.0]|
|4  |(5,[0,1],[4.0,1.0])  |
+---+---------------------+
but of course you can still convert it to a single representation:
from pyspark.ml.linalg import SparseVector, VectorUDT
import numpy as np
def to_sparse(c):
    def to_sparse_(v):
        if isinstance(v, SparseVector):
            return v
        vs = v.toArray()
        nonzero = np.nonzero(vs)[0]
        return SparseVector(v.size, nonzero, vs[nonzero])
    return udf(to_sparse_, VectorUDT())(c)
+---+-------------------------------------+
|id |features                             |
+---+-------------------------------------+
|0  |(5,[1],[2.0])                        |
|5  |(5,[0,3],[5.0,1.0])                  |
|1  |(5,[0,1,2,3],[1.0,1.0,3.0,1.0])      |
|3  |(5,[0,1],[3.0,1.0])                  |
|2  |(5,[0,1,2,3,4],[2.0,1.0,3.0,1.0,1.0])|
|4  |(5,[0,1],[4.0,1.0])                  |
+---+-------------------------------------+
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With