Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sparse Vector pyspark

I'd like to find an efficient method to create spare vectors in PySpark using dataframes.

Let's say given the transactional input:

df = spark.createDataFrame([
    (0, "a"),
    (1, "a"),
    (1, "b"),
    (1, "c"),
    (2, "a"),
    (2, "b"),
    (2, "b"),
    (2, "b"),
    (2, "c"),
    (0, "a"),
    (1, "b"),
    (1, "b"),
    (2, "cc"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])
+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       a|
|  1|       b|
|  1|       c|
|  2|       a|
|  2|       b|
|  2|       b|
|  2|       b|
|  2|       c|
|  0|       a|
|  1|       b|
|  1|       b|
|  2|      cc|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+

In a summed up format:

df.groupBy(df["id"],df["category"]).count().show()
+---+--------+-----+
| id|category|count|
+---+--------+-----+
|  1|       b|    3|
|  1|       a|    1|
|  1|       c|    1|
|  2|      cc|    1|
|  2|       c|    1|
|  2|       a|    1|
|  1|       a|    1|
|  0|       a|    2|
+---+--------+-----+

My aim is to get this output by id:

+---+-----------------------------------------------+
| id|                                       feature |
+---+-----------------------------------------------+
|  2|SparseVector({a: 1.0, b: 3.0, c: 1.0, cc: 1.0})|

Could you please point me in the right direction? With mapreduce in Java it seemed to be way easier for me.

like image 449
ponthu Avatar asked May 05 '17 16:05

ponthu


People also ask

What is sparse vector in spark?

A sparse vector is used for storing non-zero entries for saving space. It has two parallel arrays: One for indices. The other for values.

What are sparse vectors?

A sparse vector is a vector having a relatively small number of nonzero elements.

How do you represent a sparse vector?

Each sparse vector will consist of a number of index-value pairs, where the first number in each pair is an integer representing the index (location), and the second number is a floating-point number representing the actual value. You may assume all index locations are non-negative.


1 Answers

This can be done pretty easily with pivot and VectorAssembler. Replace aggregation with pivot:

 pivoted = df.groupBy("id").pivot("category").count().na.fill(0)

and assemble:

from pyspark.ml.feature import VectorAssembler

input_cols = [x for x in pivoted.columns if x != id]

result = (VectorAssembler(inputCols=input_cols, outputCol="features")
    .transform(pivoted)
    .select("id", "features"))

with the result being as follows. This will choose more efficient representation depending on sparsity:

+---+---------------------+
|id |features             |
+---+---------------------+
|0  |(5,[1],[2.0])        |
|5  |(5,[0,3],[5.0,1.0])  |
|1  |[1.0,1.0,3.0,1.0,0.0]|
|3  |(5,[0,1],[3.0,1.0])  |
|2  |[2.0,1.0,3.0,1.0,1.0]|
|4  |(5,[0,1],[4.0,1.0])  |
+---+---------------------+

but of course you can still convert it to a single representation:

from pyspark.ml.linalg import SparseVector, VectorUDT
import numpy as np

def to_sparse(c):
    def to_sparse_(v):
        if isinstance(v, SparseVector):
            return v
        vs = v.toArray()
        nonzero = np.nonzero(vs)[0]
        return SparseVector(v.size, nonzero, vs[nonzero])
    return udf(to_sparse_, VectorUDT())(c)
+---+-------------------------------------+
|id |features                             |
+---+-------------------------------------+
|0  |(5,[1],[2.0])                        |
|5  |(5,[0,3],[5.0,1.0])                  |
|1  |(5,[0,1,2,3],[1.0,1.0,3.0,1.0])      |
|3  |(5,[0,1],[3.0,1.0])                  |
|2  |(5,[0,1,2,3,4],[2.0,1.0,3.0,1.0,1.0])|
|4  |(5,[0,1],[4.0,1.0])                  |
+---+-------------------------------------+
like image 58
zero323 Avatar answered Sep 30 '22 11:09

zero323