Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python - Pickle Spacy for PySpark

The documentation for Spacy 2.0 mentions that the developers have added functionality to allow for Spacy to be pickled so that it can be used by a Spark Cluster interfaced by PySpark, however, they don't give instructions on how to do this.

Can someone explain how I can pickle Spacy's English-language NE parser to be used inside of my udf functions?

This doesn't work:

from pyspark import cloudpickle
nlp = English()
pickled_nlp = cloudpickle.dumps(nlp)
like image 551
Chris C Avatar asked Jun 15 '18 17:06

Chris C


2 Answers

Not really an answer, but the best workaround I've discovered:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
import spacy

def get_entities_udf():
    def get_entities(text):
        global nlp
        try:
            doc = nlp(unicode(text))
        except:
            nlp = spacy.load('en')
            doc = nlp(unicode(text))
        return [t.label_ for t in doc.ents]
    res_udf = udf(get_entities, StringType(ArrayType()))
    return res_udf

documents_df = documents_df.withColumn('entities', get_entities_udf()('text'))
like image 190
Chris C Avatar answered Oct 01 '22 19:10

Chris C


This worked for my needs and seems to be very quick (adapted from end of discussion here):

# create class to wrap spacy object
class SpacyMagic(object):
    """
    Simple Spacy Magic to minimize loading time.
    >>> SpacyMagic.get("en")
    <spacy.en.English ...
    """
    _spacys = {}

    @classmethod
    def get(cls, lang):
        if lang not in cls._spacys:
            import spacy
            cls._spacys[lang] = spacy.load(lang, disable=['parser', 'tagger', 'ner'])
        return cls._spacys[lang]

# broadcast `nlp` object as `nlp_br`
nlp_br = sc.broadcast( SpacyMagic.get('en_core_web_lg') )

# returns a list of word2vec vectors for each phrase or word `x`
def get_vector(x):
    return nlp_br.value(x).vector.tolist()

get_vector_udf = F.udf( get_vector, T.ArrayType( T.FloatType() ) )

# create new column with word2vec vectors
new_df = df.withColumn( 'w2v_vectors', get_vector_udf( F.col('textColumn') ) )
like image 30
scottlittle Avatar answered Oct 01 '22 20:10

scottlittle