Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pymongo multiprocessing

I have to do a lot of inserts and updates on a MongoDB.

I'm trying to test multiprocessing to do these tasks. For this I created this simple code. My dummy data is:

documents = [{"a number": i} for i in range(1000000)]

Without multiprocessing:

time1s = time.time()
client = MongoClient()
db = client.mydb
col = db.mycol
for doc in documents:
    col.insert_one(doc)
time1f = time.time()
print(time1f-time1s)

I got 150 seconds.

With multiprocessing I defined the following worker function, as needed and described in Pymongo's FAQs.

def insert_doc(document):
    client = MongoClient()
    db = client.mydb
    col = db.mycol
    col.insert_one(document)

However, when I run my code:

time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, documents)
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)

I get an error:

pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno 99] Cannot assign requested address

A total of 26447 documents were processed before the error was raised. This error is explained here, though the person that run into that error did not use multiprocessing. There the solution was to open just one MongoClient, but this is impossible when I want to do multiprocessing. Is there any workaround? Thanks for your help.

like image 325
Vladimir Vargas Avatar asked Dec 06 '22 16:12

Vladimir Vargas


1 Answers

Your code creates a new MongoClient for each of the million documents in your example (just like the question you linked to). This requires you to open a new socket for each new query. This defeats PyMongo's connection pooling, and besides being extremely slow, it also means you open and close sockets faster than your TCP stack can keep up: you leave too many sockets in TIME_WAIT state so you eventually run out of ports.

You can create fewer clients, and therefore open fewer sockets, if you insert large numbers of documents with each client:

import multiprocessing as mp
import time
from pymongo import MongoClient

documents = [{"a number": i} for i in range(1000000)]

def insert_doc(chunk):
    client = MongoClient()
    db = client.mydb
    col = db.mycol
    col.insert_many(chunk)

chunk_size = 10000

def chunks(sequence):
    # Chunks of 1000 documents at a time.
    for j in range(0, len(sequence), chunk_size):
        yield sequence[j:j + chunk_size]

time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, chunks(documents))
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)
like image 79
A. Jesse Jiryu Davis Avatar answered Dec 28 '22 02:12

A. Jesse Jiryu Davis