Referring this post Multi-Threaded NLP with Spacy pipe which talks about that,
and here from https://spacy.io/
from spacy.attrs import *
# All strings mapped to integers, for easy export to numpy
np_array = doc.to_array([LOWER, POS, ENT_TYPE, IS_ALPHA])
from reddit_corpus import RedditComments
reddit = RedditComments('/path/to/reddit/corpus')
# Parse a stream of documents, with multi-threading (no GIL!)
# Processes over 100,000 tokens per second.
for doc in nlp.pipe(reddit.texts, batch_size=10000, n_threads=4):
# Multi-word expressions, such as names, dates etc
# can be merged into single tokens
for ent in doc.ents:
ent.merge(ent.root.tag_, ent.text, ent.ent_type_)
# Efficient, lossless serialization --- all annotations
# saved, same size as uncompressed text
byte_string = doc.to_bytes()
I need to write up a proper blog post on this. The tl;dr is that spaCy is implemented in Cython, a Python-like language that transpiles into C or C++, and ultimately produces a Python extension. You can read more about releasing the GIL with Cython here:
http://docs.cython.org/src/userguide/parallelism.html
Here's the implementation of the .pipe method in spaCy:
https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/parser.pyx#L135
def pipe(self, stream, int batch_size=1000, int n_threads=2):
cdef Pool mem = Pool()
cdef TokenC** doc_ptr = <TokenC**>mem.alloc(batch_size, sizeof(TokenC*))
cdef int* lengths = <int*>mem.alloc(batch_size, sizeof(int))
cdef Doc doc
cdef int i
cdef int nr_class = self.moves.n_moves
cdef int nr_feat = self.model.nr_feat
cdef int status
queue = []
for doc in stream:
doc_ptr[len(queue)] = doc.c
lengths[len(queue)] = doc.length
queue.append(doc)
if len(queue) == batch_size:
with nogil:
for i in cython.parallel.prange(batch_size, num_threads=n_threads):
status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
if status != 0:
with gil:
sent_str = queue[i].text
raise ValueError("Error parsing doc: %s" % sent_str)
PyErr_CheckSignals()
for doc in queue:
self.moves.finalize_doc(doc)
yield doc
queue = []
batch_size = len(queue)
with nogil:
for i in cython.parallel.prange(batch_size, num_threads=n_threads):
status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
if status != 0:
with gil:
sent_str = queue[i].text
raise ValueError("Error parsing doc: %s" % sent_str)
PyErr_CheckSignals()
for doc in queue:
self.moves.finalize_doc(doc)
yield doc
The actual mechanics of the multi-threading are super simple, because NLP is (often) embarrassingly parallel --- every document is parsed independently, so we just need to make a prange loop over a stream of texts.
Implementing the parser in a multi-threaded way was quite hard, though. To use multi-threading effectively, you need to release the GIL, and not reacquire it. This means making no use of Python objects, not raising exceptions, etc.
When you create a Python object --- let's say a list --- you need to increment its reference count, which is stored globally. This means acquiring the GIL. There's no way around that. But if you're in a C extension and you just want to, say, put an integer on the stack, or make a call to malloc or free, you don't need to acquire the GIL. So if you write the program at that level, using only C and C++ constructs, you can release the GIL.
I've been writing statistical parsers in Cython for a few years now. (Before spaCy I had an implementation for my academic research.) Getting the entire parsing loop written without the GIL was hard. By late 2015 I had the machine learning, hash table, outer parsing loop, and most of the feature extraction as nogil code. But the state object had a complicated interface, and was implemented as a cdef class. I couldn't create this object or store it in a container without acquiring the GIL.
The break-through came when I figured out an undocumented way to write a C++ class in Cython. This allowed me to hollow out the existing cdef class that controlled the parser state. I proxied its interface to the inner C++ class, method by method. This way I could keep the code working, and make sure I didn't introduce any subtle bugs into the feature calculation.
You can see the inner class here: https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/_state.pxd
If you navigate around the git history of this file, you can see the patches where I implemented the .pipe method.
Presumably it does the parsing at C level rather than at python level. Once you drop down into C, if don't need to access any python objects you can safely release the GIL. At the lowest level of reading and writing, CPython also releases the GIL. The reasoning being, if there are other threads running and we are about to call a blocking C function then we should release the GIL for the duration of the function call.
You can see this in action at CPython's lowest implementation of write.
if (gil_held) {
do {
Py_BEGIN_ALLOW_THREADS
errno = 0;
#ifdef MS_WINDOWS
n = write(fd, buf, (int)count);
#else
n = write(fd, buf, count);
#endif
/* save/restore errno because PyErr_CheckSignals()
* and PyErr_SetFromErrno() can modify it */
err = errno;
Py_END_ALLOW_THREADS
} while (n < 0 && err == EINTR &&
!(async_err = PyErr_CheckSignals()));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With