Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add two Sparse Vectors in Spark using Python

I've searched everywhere but I couldn't find how to add two sparse vectors using Python. I want to add two sparse vectors like this:-

(1048576, {110522: 0.6931, 521365: 1.0986, 697409: 1.0986, 725041: 0.6931, 749730: 0.6931, 962395: 0.6931})

(1048576, {4471: 1.0986, 725041: 0.6931, 850325: 1.0986, 962395: 0.6931})
like image 270
Nick Avatar asked Oct 07 '15 00:10

Nick


People also ask

What is sparse vector in spark?

A sparse vector is used for storing non-zero entries for saving space. It has two parallel arrays: One for indices. The other for values.

What is a sparse vector in Python?

A sparse vector is a vector whose entries are almost all zero, like [1, 0, 0, 0, 0, 0, 0, 2, 0] . Storing all those zeros wastes memory and dictionaries are commonly used to keep track of just the nonzero entries.

How do you represent a sparse vector?

One possibility is to represent the elements of a sparse vector as a linked list of nodes, each of which contains an integer index, a numerical value, and a pointer to the next node.

What are sparse vectors?

A sparse vector is a vector having a relatively small number of nonzero elements. Consider the following as an example of a sparse vector x with n elements, where n is 11, and vector x is: (0.0, 0.0, 1.0, 0.0, 2.0, 3.0, 0.0, 4.0, 0.0, 5.0, 0.0) In Storage.


2 Answers

Something like this should work:

from pyspark.mllib.linalg import Vectors, SparseVector, DenseVector
import numpy as np

def add(v1, v2):
    """Add two sparse vectors
    >>> v1 = Vectors.sparse(3, {0: 1.0, 2: 1.0})
    >>> v2 = Vectors.sparse(3, {1: 1.0})
    >>> add(v1, v2)
    SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0})
    """
    assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
    assert v1.size == v2.size 
    # Compute union of indices
    indices = set(v1.indices).union(set(v2.indices))
    # Not particularly efficient but we are limited by SPARK-10973
    # Create index: value dicts
    v1d = dict(zip(v1.indices, v1.values))
    v2d = dict(zip(v2.indices, v2.values))
    zero = np.float64(0)
    # Create dictionary index: (v1[index] + v2[index])
    values =  {i: v1d.get(i, zero) + v2d.get(i, zero)
       for i in indices
       if v1d.get(i, zero) + v2d.get(i, zero) != zero}

    return Vectors.sparse(v1.size, values)

If you prefer only single pass and don't care about introduced zeros you can modify above code like this:

from collections import defaultdict

def add(v1, v2):
    assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
    assert v1.size == v2.size
    values = defaultdict(float) # Dictionary with default value 0.0
    # Add values from v1
    for i in range(v1.indices.size):
        values[v1.indices[i]] += v1.values[i]
    # Add values from v2
    for i in range(v2.indices.size):
        values[v2.indices[i]] += v2.values[i]
    return Vectors.sparse(v1.size, dict(values))

If you want you can try monkey patch SparseVector:

SparseVector.__add__ = add
v1 = Vectors.sparse(5, {0: 1.0, 2: 3.0})
v2 = Vectors.sparse(5, {0: -3.0, 2: -3.0, 4: 10})
v1 + v2
## SparseVector(5, {0: -2.0, 4: 10.0})

Alternatively you should be able to use scipy.sparse.

from scipy.sparse import csc_matrix
from pyspark.mllib.regression import LabeledPoint

m1 = csc_matrix((
   v1.values,
   (v1.indices, [0] * v1.numNonzeros())),
   shape=(v1.size, 1))

m2 = csc_matrix((
   v2.values,
   (v2.indices, [0] * v2.numNonzeros())),
   shape=(v2.size, 1))

LabeledPoint(0, m1 + m2)
like image 95
zero323 Avatar answered Oct 17 '22 08:10

zero323


I had the same problem, but I wasn't able to get the other solutions to complete in less than several hours on a moderately sized dataset (~20M records, vector size = 10k)

So instead I took another related approach which finished in just a few minutes:

import numpy as np

def to_sparse(v):
  values = {i: e for i,e in enumerate(v) if e != 0}
  return Vectors.sparse(v.size, values)

rdd.aggregate(
  np.zeros(vector_size), 
  lambda acc, b: acc + b.toArray(), 
  lambda acc, b: acc + b
).map(to_sparse)

The basic idea was to not build the sparse vector at every step of the reduce, just once at the end and let numpy do all the vector addition work. Even using an aggregateByKey which needed to shuffle the dense vectors, it still only took a few minutes.

like image 34
bryanjj Avatar answered Oct 17 '22 08:10

bryanjj