Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Broadcast Annoy object in Spark (for nearest neighbors)?

As Spark's mllib doesn't have nearest-neighbors functionality, I'm trying to use Annoy for approximate Nearest Neighbors. I try to broadcast the Annoy object and pass it to workers; however, it does not operate as expected.

Below is code for reproducibility (to be run in PySpark). The problem is highlighted in the difference seen when using Annoy with vs without Spark.

from annoy import AnnoyIndex
import random
random.seed(42)

f = 40
t = AnnoyIndex(f)  # Length of item vector that will be indexed
allvectors = []
for i in xrange(20):
    v = [random.gauss(0, 1) for z in xrange(f)]
    t.add_item(i, v)
    allvectors.append((i, v))
t.build(10) # 10 trees

# Use Annoy with Spark
sparkvectors = sc.parallelize(allvectors)
bct = sc.broadcast(t)
x = sparkvectors.map(lambda x: bct.value.get_nns_by_vector(vector=x[1], n=5))
print "Five closest neighbors for first vector with Spark:",
print x.first()

# Use Annoy without Spark
print "Five closest neighbors for first vector without Spark:",
print(t.get_nns_by_vector(vector=allvectors[0][1], n=5))

Output seen:

Five closest neighbors for first vector with Spark: None

Five closest neighbors for first vector without Spark: [0, 13, 12, 6, 4]

like image 514
xenocyon Avatar asked Feb 03 '16 22:02

xenocyon


2 Answers

I've never used Annoy but I am pretty sure that the package description explains what is going on here:

It also creates large read-only file-based data structures that are mmapped into memory so that many processes may share the same data.

Since it is using memory mapped indexes when you serialize it and pass it to the workers all data is lost on the way.

Try something like this instead:

from pyspark import SparkFiles

t.save("index.ann")
sc.addPyFile("index.ann")

def find_neighbors(iter):
    t = AnnoyIndex(f)
    t.load(SparkFiles.get("index.ann"))
    return (t.get_nns_by_vector(vector=x[1], n=5) for x in iter)

sparkvectors.mapPartitions(find_neighbors).first()
## [0, 13, 12, 6, 4]
like image 107
zero323 Avatar answered Oct 18 '22 20:10

zero323


Just in case anyone else is following along here like I was, you'll need to import Annoy in the mapPartitions function, else you'll still get pickling errors. Here's my completed example based on the above:

from annoy import AnnoyIndex

from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark import SparkConf

import random
random.seed(42)

f = 1024
t = AnnoyIndex(f)
allvectors = []
for i in range(100):
    v = [random.gauss(0, 1) for z in range(f)]
    t.add_item(i, v)
    allvectors.append((i, v))

t.build(10)
t.save("index.ann")

def find_neighbors(i):
    from annoy import AnnoyIndex
    ai = AnnoyIndex(f)
    ai.load(SparkFiles.get("index.ann"))
    return (ai.get_nns_by_vector(vector=x[1], n=5) for x in i)

with SparkContext(conf=SparkConf().setAppName("myannoy")) as sc:
  sc.addFile("index.ann")
  sparkvectors = sc.parallelize(allvectors)
  sparkvectors.mapPartitions(find_neighbors).first()
like image 5
joe Avatar answered Oct 18 '22 20:10

joe