All documents in my collection in MongoDB have the same fields. My goal is to load them into Python into pandas.DataFrame
or dask.DataFrame
.
I'd like to speedup the loading procedure by parallelizing it. My plan is to spawn several processes or threads. Each process would load a chunk of a collection, then these chunks would be merged together.
How do I do it correctly with MongoDB?
I have tried similar approach with PostgreSQL. My initial idea was to use SKIP
and LIMIT
in SQL queries. It has failed, since each cursor, opened for each particular query, started reading data table from the beginning and just skipped specified amount of rows. So I had to create additional column, containing record numbers, and specify ranges of these numbers in queries.
On the contrary, MongoDB assigns unique ObjectID to each document. However, I've found that it is impossible to subtract one ObjectID from another, they can be only compared with ordering operations: less, greater and equal.
Also, pymongo
returns the cursor object, that supports indexing operation and has some methods, seeming useful for my task, like count
, limit
.
MongoDB connector for Spark accomplishes this task somehow. Unfortunately, I'm not familiar with Scala, therefore, it's hard for me to find out how they do it.
So, what is the correct way for parallel loading data from Mongo into python?
up to now, I've come to the following solution:
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
# import other modules.
collection = get_mongo_collection()
cursor = collection.find({ })
def process_document(in_doc):
out_doc = # process doc keys and values
return pd.DataFrame(out_doc)
df = dd.from_delayed( (delayed(process_document)(d) for d in cursor) )
However, it looks like dask.dataframe.from_delayed
internally creates a list from passed generator, effectively loading all collection in a single thread.
Update. I've found in docs, that skip
method of pymongo.Cursor
starts from beginning of a collection too, as PostgreSQL. The same page suggests using pagination logic in the application. Solutions, that I've found so far, use sorted _id
for this. However, they also store last seen _id
, that implies that they also work in a single thread.
Update2. I've found the code of the partitioner in the official MongoDb Spark connector: https://github.com/mongodb/mongo-spark/blob/7c76ed1821f70ef2259f8822d812b9c53b6f2b98/src/main/scala/com/mongodb/spark/rdd/partitioner/MongoPaginationPartitioner.scala#L32
Looks like, initially this partitioner reads the key field from all documents in the collection and calculates ranges of values.
Update3: My incomplete solution.
Doesn't work, gets the exception from pymongo, because dask seems to incorrectly treat the Collection
object:
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/dask/delayed.pyc in <genexpr>(***failed resolving arguments***)
81 return expr, {}
82 if isinstance(expr, (Iterator, list, tuple, set)):
---> 83 args, dasks = unzip((to_task_dask(e) for e in expr), 2)
84 args = list(args)
85 dsk = sharedict.merge(*dasks)
/home/user/.conda/envs/MBA/lib/python2.7/site-packages/pymongo/collection.pyc in __next__(self)
2342
2343 def __next__(self):
-> 2344 raise TypeError("'Collection' object is not iterable")
2345
2346 next = __next__
TypeError: 'Collection' object is not iterable
What raises the exception:
def process_document(in_doc, other_arg):
# custom processing of incoming records
return out_doc
def compute_id_ranges(collection, query, partition_size=50):
cur = collection.find(query, {'_id': 1}).sort('_id', pymongo.ASCENDING)
id_ranges = [cur[0]['_id']]
count = 1
for r in cur:
count += 1
if count > partition_size:
id_ranges.append(r['_id'])
count = 0
id_ranges.append(r['_id'])
return zip(id_ranges[:len(id_ranges)-1], id_ranges[1: ])
def load_chunk(id_pair, collection, query={}, projection=None):
q = query
q.update( {"_id": {"$gte": id_pair[0], "$lt": id_pair[1]}} )
cur = collection.find(q, projection)
return pd.DataFrame([process_document(d, other_arg) for d in cur])
def parallel_load(*args, **kwargs):
collection = kwargs['collection']
query = kwargs.get('query', {})
projection = kwargs.get('projection', None)
id_ranges = compute_id_ranges(collection, query)
dfs = [ delayed(load_chunk)(ir, collection, query, projection) for ir in id_ranges ]
df = dd.from_delayed(dfs)
return df
collection = connect_to_mongo_and_return_collection_object(credentials)
# df = parallel_load(collection=collection)
id_ranges = compute_id_ranges(collection)
dedf = delayed(load_chunk)(id_ranges[0], collection)
load_chunk
perfectly runs when called directly. However, call delayed(load_chunk)( blah-blah-blah )
fails with exception, mentioned above.
To handle extensive Unstructured data, you can use the MongoDB Database. MongoDB Database can connect to web applications through any programming language like PHP, Python, Ruby, Scala, C, C++, etc.
I think dask-mongo
will do the work for here. You can install it with pip or conda, and in the repo you can find some examples in a notebook.
dask-mongo
will read the data you have in MongoDB as a Dask bag but then you can go from a Dask bag to a Dask Dataframe with df = b.to_dataframe()
where b
is the bag you read from mongo using with dask_mongo.read_mongo
I was looking into pymongo parallelization and this is what worked for me. It took my humble gaming laptop nearly 100 minutes to process my mongodb of 40 million documents. The CPU was 100% utilised I had to turn on the AC :)
I used skip and limit functions to split the database, then assigned batches to processes. The code is written for Python 3:
import multiprocessing
from pymongo import MongoClient
def your_function(something):
<...>
return result
def process_cursor(skip_n,limit_n):
print('Starting process',skip_n//limit_n,'...')
collection = MongoClient().<db_name>.<collection_name>
cursor = collection.find({}).skip(skip_n).limit(limit_n)
for doc in cursor:
<do your magic>
# for example:
result = your_function(doc['your_field'] # do some processing on each document
# update that document by adding the result into a new field
collection.update_one({'_id': doc['_id']}, {'$set': {'<new_field_eg>': result} })
print('Completed process',skip_n//limit_n,'...')
if __name__ == '__main__':
n_cores = 7 # number of splits (logical cores of the CPU-1)
collection_size = 40126904 # your collection size
batch_size = round(collection_size/n_cores+0.5)
skips = range(0, n_cores*batch_size, batch_size)
processes = [ multiprocessing.Process(target=process_cursor, args=(skip_n,batch_size)) for skip_n in skips]
for process in processes:
process.start()
for process in processes:
process.join()
The last split will have a larger limit than the remaining documents, but that won't raise an error
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With