Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: Spacy and memory consumption

1 - THE PROBLEM

I'm using "spacy" on python for text documents lemmatization. There are 500,000 documents having size up to 20 Mb of clean text.

The problem is the following: spacy memory consuming is growing in time till the whole memory is used.

2 - BACKGROUND

My hardware configuration: CPU: Intel I7-8700K 3.7 GHz (12 cores) Memory: 16 Gb SSD: 1 Tb GPU is onboard but is not used for this task

I'm using "multiprocessing" to split the task among several processes (workers). Each worker receives a list of documents to process. The main process performs monitoring of child processes. I initiate "spacy" in each child process once and use this one spacy instance to handle the whole list of documents in the worker.

Memory tracing says the following:

[ Memory trace - Top 10 ]

/opt/develop/virtualenv/lib/python3.6/site-packages/thinc/neural/mem.py:68: size=45.1 MiB, count=99, average=467 KiB

/opt/develop/virtualenv/lib/python3.6/posixpath.py:149: size=40.3 MiB, count=694225, average=61 B

:487: size=9550 KiB, count=77746, average=126 B

/opt/develop/virtualenv/lib/python3.6/site-packages/dawg_python/wrapper.py:33: size=7901 KiB, count=6, average=1317 KiB

/opt/develop/virtualenv/lib/python3.6/site-packages/spacy/lang/en/lemmatizer/_nouns.py:7114: size=5273 KiB, count=57494, average=94 B

prepare_docs04.py:372: size=4189 KiB, count=1, average=4189 KiB

/opt/develop/virtualenv/lib/python3.6/site-packages/dawg_python/wrapper.py:93: size=3949 KiB, count=5, average=790 KiB

/usr/lib/python3.6/json/decoder.py:355: size=1837 KiB, count=20456, average=92 B

/opt/develop/virtualenv/lib/python3.6/site-packages/spacy/lang/en/lemmatizer/_adjectives.py:2828: size=1704 KiB, count=20976, average=83 B

prepare_docs04.py:373: size=1633 KiB, count=1, average=1633 KiB

3 - EXPECTATIONS

I have seen a good recommendation to build a separated server-client solution [here]Is possible to keep spacy in memory to reduce the load time?

Is it possible to keep memory consumption under control using "multiprocessing" approach?

4 - THE CODE

Here is a simplified version of my code:

import os, subprocess, spacy, sys, tracemalloc
from multiprocessing import Pipe, Process, Lock
from time import sleep

# START: memory trace
tracemalloc.start()

# Load spacy
spacyMorph = spacy.load("en_core_web_sm")

#
# Get word's lemma
#
def getLemma(word):
    global spacyMorph
    lemmaOutput = spacyMorph(str(word))
    return lemmaOutput


#
# Worker's logic
#
def workerNormalize(lock, conn, params):
    documentCount = 1
    for filenameRaw in params[1]:
        documentTotal = len(params[1])
        documentID = int(os.path.basename(filenameRaw).split('.')[0])

        # Send to the main process the worker's current progress
        if not lock is None:
            lock.acquire()
            try:
                statusMessage = "WORKING:{:d},{:d},".format(documentID, documentCount)
                conn.send(statusMessage)
                documentCount += 1
            finally:
                lock.release()
        else:
            print(statusMessage)

        # ----------------
        # Some code is excluded for clarity sake
        # I've got a "wordList" from file "filenameRaw"
        # ----------------

        wordCount = 1
        wordTotalCount = len(wordList)

        for word in wordList:
            lemma = getLemma(word)
            wordCount += 1

        # ----------------
        # Then I collect all lemmas and save it to another text file
        # ----------------

        # Here I'm trying to reduce memory usage
        del wordList
        del word
        gc.collect()


if __name__ == '__main__':
    lock = Lock()
    processList = []

    # ----------------
    # Some code is excluded for clarity sake
    # Here I'm getting full list of files "fileTotalList" which I need to lemmatize
    # ----------------
    while cursorEnd < (docTotalCount + stepSize):
        fileList = fileTotalList[cursorStart:cursorEnd]

        # ----------------
        # Create workers and populate it with list of files to process
        # ----------------
        processData = {}
        processData['total'] = len(fileList)  # worker total progress
        processData['count'] = 0  # worker documents done count
        processData['currentDocID'] = 0  # current document ID the worker is working on
        processData['comment'] = ''  # additional comment (optional)
        processData['con_parent'], processData['con_child'] = Pipe(duplex=False)
        processName = 'worker ' + str(count) + " at " + str(cursorStart)
        processData['handler'] = Process(target=workerNormalize, name=processName, args=(lock, processData['con_child'], [processName, fileList]))

        processList.append(processData)
        processData['handler'].start()

        cursorStart = cursorEnd
        cursorEnd += stepSize
        count += 1

    # ----------------
    # Run the monitor to look after the workers
    # ----------------
    while True:
        runningCount = 0

        #Worker communication format:
        #STATUS:COMMENTS

        #STATUS:
        #- WORKING - worker is working
        #- CLOSED - worker has finished his job and closed pipe-connection

        #COMMENTS:
        #- for WORKING status:
        #DOCID,COUNT,COMMENTS
        #DOCID - current document ID the worker is working on
        #COUNT - count of done documents
        #COMMENTS - additional comments (optional)


        # ----------------
        # Run through the list of workers ...
        # ----------------
        for i, process in enumerate(processList):
            if process['handler'].is_alive():
                runningCount += 1

                # ----------------
                # .. and check if there is somethng in the PIPE
                # ----------------
                if process['con_parent'].poll():
                    try:
                        message = process['con_parent'].recv()
                        status = message.split(':')[0]
                        comment = message.split(':')[1]

                        # ----------------
                        # Some code is excluded for clarity sake
                        # Update worker's information and progress in "processList"
                        # ----------------

                    except EOFError:
                        print("EOF----")

                # ----------------
                # Some code is excluded for clarity sake
                # Here I draw some progress lines per workers
                # ----------------

            else:
                # worker has finished his job. Close the connection.
                process['con_parent'].close()

        # Whait for some time and monitor again
        sleep(PARAM['MONITOR_REFRESH_FREQUENCY'])


    print("================")
    print("**** DONE ! ****")
    print("================")

    # ----------------
    # Here I'm measuring memory usage to find the most "gluttonous" part of the code
    # ----------------
    snapshot = tracemalloc.take_snapshot()
    top_stats = snapshot.statistics('lineno')

    print("[ Memory trace - Top 10 ]")
    for stat in top_stats[:10]:
        print(stat)


'''

like image 866
VictorDDT Avatar asked Apr 25 '19 02:04

VictorDDT


2 Answers

For people who land on this in the future, I found a hack that seems to work well:

import spacy
import en_core_web_lg
import multiprocessing

docs = ['Your documents']

def process_docs(docs, n_processes=None):
    # Load the model inside the subprocess, 
    # as that seems to be the main culprit of the memory issues
    nlp = en_core_web_lg.load()

    if not n_processes:
        n_processes = multiprocessing.cpu_count()

    processed_docs = [doc for doc in nlp.pipe(docs, disable=['ner', 'parser'], n_process=n_processes)]


    # Then do what you wish beyond this point. I end up writing results out to s3.
    pass

for x in range(10):
    # This will spin up a subprocess, 
    # and everytime it finishes it will release all resources back to the machine.
    with multiprocessing.Manager() as manager:
        p = multiprocessing.Process(target=process_docs, args=(docs))
        p.start()
        p.join()

The idea here is to put everything Spacy-related into a subprocess so all the memory gets released once the subprocess finishes. I know it's working because I can actually watch the memory get released back to the instance every time the subprocess finishes (also the instance no longer crashes xD).

Full Disclosure: I have no idea why Spacy seems to go up in memory overtime, I've read all over trying to find a simple answer, and all the github issues I've seen claim they've fixed the issue yet I still see this happening when I use Spacy on AWS Sagemaker instances.

Hope this helps someone! I know I spent hours pulling my hair out over this.

Credit to another SO answer that explains a bit more about subprocesses in Python.

like image 68
mkerrig Avatar answered Oct 19 '22 02:10

mkerrig


Memory leaks with spacy

Memory problems when processing large amounts of data seem to be a known issue, see some relevant github issues:

  • https://github.com/explosion/spaCy/issues/3623
  • https://github.com/explosion/spaCy/issues/3556

Unfortunately, it doesn't look like there's a good solution yet.

Lemmatization

Looking at your particular lemmatization task, I think your example code is a bit too over-simplified, because you're running the full spacy pipeline on single words and then not doing anything with the results (not even inspecting the lemma?), so it's hard to tell what you actually want to do.

I'll assume you just want to lemmatize, so in general, you want to disable the parts of the pipeline that you're not using as much as possible (especially parsing if you're only lemmatizing, see https://spacy.io/usage/processing-pipelines#disabling) and use nlp.pipe to process documents in batches. Spacy can't handle really long documents if you're using the parser or entity recognition, so you'll need to break up your texts somehow (or for just lemmatization/tagging you can just increase nlp.max_length as much as you need).

Breaking documents into individual words as in your example kind of the defeats the purpose of most of spacy's analysis (you often can't meaningfully tag or parse single words), plus it's going to be very slow to call spacy this way.

Lookup lemmatization

If you just need lemmas for common words out of context (where the tagger isn't going to provide any useful information), you can see if the lookup lemmatizer is good enough for your task and skip the rest of the processing:

from spacy.lemmatizer import Lemmatizer
from spacy.lang.en import LOOKUP
lemmatizer = Lemmatizer(lookup=LOOKUP)
print(lemmatizer(u"ducks", ''), lemmatizer(u"ducking", ''))

Output:

['duck'] ['duck']

It is just a static lookup table, so it won't do well on unknown words or capitalization for words like "wugs" or "DUCKS", so you'll have to see if it works well enough for your texts, but it would be much much faster without memory leaks. (You could also just use the table yourself without spacy, it's here: https://github.com/michmech/lemmatization-lists.)

Better lemmatization

Otherwise, use something more like this to process texts in batches:

nlp = spacy.load('en', disable=['parser', 'ner'])
# if needed: nlp.max_length = MAX_DOC_LEN_IN_CHAR
for doc in nlp.pipe(texts):
  for token in doc:
    print(token.lemma_)

If you process one long text (or use nlp.pipe() for lots of shorter texts) instead of processing individual words, you should be able to tag/lemmatize (many) thousands of words per second in one thread.

like image 10
aab Avatar answered Oct 19 '22 00:10

aab