The documentation for Spacy 2.0 mentions that the developers have added functionality to allow for Spacy to be pickled so that it can be used by a Spark Cluster interfaced by PySpark, however, they don't give instructions on how to do this.
Can someone explain how I can pickle Spacy's English-language NE parser to be used inside of my udf functions?
This doesn't work:
from pyspark import cloudpickle
nlp = English()
pickled_nlp = cloudpickle.dumps(nlp)
Not really an answer, but the best workaround I've discovered:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
import spacy
def get_entities_udf():
def get_entities(text):
global nlp
try:
doc = nlp(unicode(text))
except:
nlp = spacy.load('en')
doc = nlp(unicode(text))
return [t.label_ for t in doc.ents]
res_udf = udf(get_entities, StringType(ArrayType()))
return res_udf
documents_df = documents_df.withColumn('entities', get_entities_udf()('text'))
This worked for my needs and seems to be very quick (adapted from end of discussion here):
# create class to wrap spacy object
class SpacyMagic(object):
"""
Simple Spacy Magic to minimize loading time.
>>> SpacyMagic.get("en")
<spacy.en.English ...
"""
_spacys = {}
@classmethod
def get(cls, lang):
if lang not in cls._spacys:
import spacy
cls._spacys[lang] = spacy.load(lang, disable=['parser', 'tagger', 'ner'])
return cls._spacys[lang]
# broadcast `nlp` object as `nlp_br`
nlp_br = sc.broadcast( SpacyMagic.get('en_core_web_lg') )
# returns a list of word2vec vectors for each phrase or word `x`
def get_vector(x):
return nlp_br.value(x).vector.tolist()
get_vector_udf = F.udf( get_vector, T.ArrayType( T.FloatType() ) )
# create new column with word2vec vectors
new_df = df.withColumn( 'w2v_vectors', get_vector_udf( F.col('textColumn') ) )
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With