I have built a Word2Vec model using Spark and save it as a model. Now, I want to use it in another code as offline model. I have loaded the model and used it to present vector of a word (e.g. Hello) and it works well. But, I need to call it for many words in an RDD using map.
When I call model.transform() in a map function, it throws this error:
"It appears that you are attempting to reference SparkContext from a broadcast " Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
the code:
from pyspark import SparkContext
from pyspark.mllib.feature import Word2Vec
from pyspark.mllib.feature import Word2VecModel
sc = SparkContext('local[4]',appName='Word2Vec')
model=Word2VecModel.load(sc, "word2vecModel")
x= model.transform("Hello")
print(x[0]) # it works fine and returns [0.234, 0.800,....]
y=sc.parallelize([['Hello'],['test']])
y.map(lambda w: model.transform(w[0])).collect() #it throws the error
I will really appreciate your help.
It is an expected behavior. Like other MLlib
models Python object is just a wrapper around Scala model and actual processing is delegated to its JVM counterpart. Since Py4J gateway is not accessible on workers (see How to use Java/Scala function from an action or a transformation?) you cannot call Java / Scala method from an action or transformation.
Typically MLlib models provide a helper method which can work directly on RDDs but it is not the case here. Word2VecModel
provides getVectors
method which returns a map from words to vector but unfortunately it is a JavaMap
so it won't work inside transformation. You could try something like this:
from pyspark.mllib.linalg import DenseVector
vectors_ = model.getVectors() # py4j.java_collections.JavaMap
vectors = {k: DenseVector([x for x in vectors_.get(k)])
for k in vectors_.keys()}
to get Python dictionary but it will be extremely slow. Another option is to dump this object to disk in a form that can be consumed by Python but it requires some tinkering with Py4J and it is better to avoid this. Instead lets read model as a DataFrame:
lookup = sqlContext.read.parquet("path_to_word2vec_model/data").alias("lookup")
and we'll get a following structure:
lookup.printSchema()
## root
## |-- word: string (nullable = true)
## |-- vector: array (nullable = true)
## | |-- element: float (containsNull = true)
which can be used to map words to vectors for example through join
:
from pyspark.sql.functions import col
words = sc.parallelize([('hello', ), ('test', )]).toDF(["word"]).alias("words")
words.join(lookup, col("words.word") == col("lookup.word"))
## +-----+-----+--------------------+
## | word| word| vector|
## +-----+-----+--------------------+
## |hello|hello|[-0.030862354, -0...|
## | test| test|[-0.13154022, 0.2...|
## +-----+-----+--------------------+
If data fits into driver / worker memory you can try to collect and map with broadcast:
lookup_bd = sc.broadcast(lookup.rdd.collectAsMap())
rdd = sc.parallelize([['Hello'],['test']])
rdd.map(lambda ws: [lookup_bd.value.get(w) for w in ws])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With