Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Shared Memory Dictionary for Mapping Big Data

I've been having a hard time using a large dictionary (~86GB, 1.75 billion keys) to process a big dataset (2TB) using multiprocessing in Python.

Context: a dictionary mapping strings to strings is loaded from pickled files into memory. Once loaded, worker processes (ideally >32) are created that must lookup values in the dictionary but not modify it's contents, in order to process the ~2TB dataset. The data set needs to be processed in parallel otherwise the task would take over a month.

Here are the two three four five six seven eight nine approaches (all failing) that I have tried:

  1. Store the dictionary as a global variable in the Python program and then fork the ~32 worker processes. Theoretically this method might work since the dictionary is not being modified and therefore the COW mechanism of fork on Linux would mean that the data structure would be shared and not copied among processes. However, when I attempt this, my program crashes on os.fork() inside of multiprocessing.Pool.map from OSError: [Errno 12] Cannot allocate memory. I'm convinced that this is because the kernel is configured to never overcommit memory (/proc/sys/vm/overcommit_memory is set to 2, and I can't configure this setting on the machine since I don't have root access).

  2. Load the dictionary into a shared-memory dictionary with multiprocessing.Manager.dict. With this approach I was able to fork the 32 worker process without crashing but the subsequent data processing is orders of magnitude slower than another version of the task that required no dictionary (only difference is no dictionary lookup). I theorize that this is because of the inter-process communication between the manager process containing the dictionary and each worker process, that is required for every single dictionary lookup. Although the dictionary is not being modified, it is being accessed many many times, often simultaneously by many processes.

  3. Copy the dictionary into a C++ std::map and rely on Linux's COW mechanism to prevent it from being copied (like approach #1 except with the dictionary in C++). With this approach, it took a long time to load the dictionary into std::map and subsequently crashed from ENOMEM on os.fork() just as before.

  4. Copy the dictionary into pyshmht. It takes far too long to copy the dictionary into pyshmht.

  5. Try using SNAP's HashTable. The underlying implementation in C++ allows for it to be made and used in shared memory. Unfortunately the Python API does not offer this functionality.

  6. Use PyPy. Crash still happened as in #1.

  7. Implement my own shared-memory hash table in python on top of multiprocessing.Array. This approach still resulted in the out of memory error that ocured in #1.

  8. Dump the dictionary into dbm. After trying to dump the dictionary into a dbm database for four days and seeing an ETA of "33 days", I gave up on this approach.

  9. Dump the dictionary into Redis. When I try to dump the dictionaries (the 86GB dict is loaded from 1024 smaller dicts) into Redis using redis.mset I get a connection reset by peer error. When I try to dump the key-value pairs using a loop, it takes an extremely long time.

How can I process this dataset in parallel efficiently without requiring inter-process communication in order to lookup values in this dictionary. I would welcome any suggestions for solving this problem!

I'm using Python 3.6.3 from Anaconda on Ubuntu on a machine with 1TB RAM.


Edit: What finally worked:

I was able to get this to work using Redis. To get around the issued in #9, I had to chunk the large key-value insertion and lookup queries into "bite-sized" chunks so that it was still processing in batches, but didn't time-out from too large a query. Doing this allowed the insertion of the 86GB dictionary to be performed in 45 minutes (with 128 threads and some load balancing), and the subsequent processing was not hampered in performance by the Redis lookup queries (finished in 2 days).

Thank you all for your help and suggestions.

like image 262
Jon Deaton Avatar asked Mar 22 '18 21:03

Jon Deaton


2 Answers

You should probably use a system that's meant for sharing large amounts of data with many different processes -- like a Database.

Take your giant dataset and create a schema for it and dump it into a database. You could even put it on a separate machine.

Then launch as many processes as you want, across as many hosts as you want, to process the data in parallel. Pretty much any modern database will be more than capable of handling the load.

like image 73
Brendan Abel Avatar answered Oct 13 '22 13:10

Brendan Abel


Instead of using a dictionary, use a data structure that compresses data, but still has fast lookups.

e.g:

  • keyvi: https://github.com/cliqz-oss/keyvi keyvi is a FSA-based key-value data structure optimized for space & lookup speed. multiple processes reading from keyvi will re-use the memory, because a keyvi structure is memory mapped and it uses shared memory. Since your worker processes don't need to modify the data structure, I think this would be your best bet.

  • marisa trie: https://github.com/pytries/marisa-trie static trie structure for Python, based on the marisa-trie C++ library. Like keyvi, marisa-trie also uses memory-mapping. Multiple processes using the same trie will use the same memory.

EDIT:

To use keyvi for this task, you can first install it with pip install pykeyvi. Then use it like this:

from pykeyvi import StringDictionaryCompiler, Dictionary

# Create the dictionary
compiler = StringDictionaryCompiler()
compiler.Add('foo', 'bar')
compiler.Add('key', 'value')
compiler.Compile()
compiler.WriteToFile('test.keyvi')

# Use the dictionary
dct = Dictionary('test.keyvi')
dct['foo'].GetValue()
> 'bar'
dct['key'].GetValue()
> 'value'

marisa trie is just a trie, so it wouldn't work as a mapping out of the box, but you can for example us a delimiter char to separate keys from values.

like image 37
tomas Avatar answered Oct 13 '22 14:10

tomas