Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

using Word2VecModel.transform() does not work in map function

I have built a Word2Vec model using Spark and save it as a model. Now, I want to use it in another code as offline model. I have loaded the model and used it to present vector of a word (e.g. Hello) and it works well. But, I need to call it for many words in an RDD using map.

When I call model.transform() in a map function, it throws this error:

"It appears that you are attempting to reference SparkContext from a broadcast " Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

the code:

from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import Word2VecModel

sc = SparkContext('local[4]',appName='Word2Vec')

model=Word2VecModel.load(sc, "word2vecModel")

x= model.transform("Hello")
print(x[0]) # it works fine and returns [0.234, 0.800,....]

y=sc.parallelize([['Hello'],['test']])
y.map(lambda w: model.transform(w[0])).collect() #it throws the error

I will really appreciate your help.

like image 656
Saeed Avatar asked Dec 24 '15 06:12

Saeed


1 Answers

It is an expected behavior. Like other MLlib models Python object is just a wrapper around Scala model and actual processing is delegated to its JVM counterpart. Since Py4J gateway is not accessible on workers (see How to use Java/Scala function from an action or a transformation?) you cannot call Java / Scala method from an action or transformation.

Typically MLlib models provide a helper method which can work directly on RDDs but it is not the case here. Word2VecModel provides getVectors method which returns a map from words to vector but unfortunately it is a JavaMap so it won't work inside transformation. You could try something like this:

from pyspark.mllib.linalg import DenseVector

vectors_ = model.getVectors() # py4j.java_collections.JavaMap
vectors = {k: DenseVector([x for x in vectors_.get(k)])
    for k in vectors_.keys()}

to get Python dictionary but it will be extremely slow. Another option is to dump this object to disk in a form that can be consumed by Python but it requires some tinkering with Py4J and it is better to avoid this. Instead lets read model as a DataFrame:

lookup = sqlContext.read.parquet("path_to_word2vec_model/data").alias("lookup")

and we'll get a following structure:

lookup.printSchema()
## root
## |-- word: string (nullable = true)
## |-- vector: array (nullable = true)
## |    |-- element: float (containsNull = true)

which can be used to map words to vectors for example through join:

from pyspark.sql.functions import col

words = sc.parallelize([('hello', ), ('test', )]).toDF(["word"]).alias("words")

words.join(lookup, col("words.word") == col("lookup.word"))

## +-----+-----+--------------------+
## | word| word|              vector|
## +-----+-----+--------------------+
## |hello|hello|[-0.030862354, -0...|
## | test| test|[-0.13154022, 0.2...|
## +-----+-----+--------------------+

If data fits into driver / worker memory you can try to collect and map with broadcast:

lookup_bd = sc.broadcast(lookup.rdd.collectAsMap())
rdd = sc.parallelize([['Hello'],['test']])
rdd.map(lambda ws: [lookup_bd.value.get(w) for w in ws])
like image 181
zero323 Avatar answered Oct 22 '22 07:10

zero323