I have a dictionary of python pandas dataframes. The total size of this dictionary is about 2GB. However, when I share it across 16 multiprocessing (in the subprocesses I only read the data of the dict without modifying it), it takes 32GB ram. So I would like to ask if it is possible for me to share this dictionary across multiprocessing without copying it. I tried to convert it to manager.dict(). But it seems it takes too long. What would be the most standard way to achieve this? Thank you.
Passing Messages to Processes A simple way to communicate between process with multiprocessing is to use a Queue to pass messages back and forth. Any pickle-able object can pass through a Queue. This short example only passes a single message to a single worker, then the main process waits for the worker to finish.
Python 3.8 introduced a new module multiprocessing. shared_memory that provides shared memory for direct access across processes. My test shows that it significantly reduces the memory usage, which also speeds up the program by reducing the costs of copying and moving things around.
Numba can be used in 2 ways with pandas: Specify the engine="numba" keyword in select pandas methods. Define your own Python function decorated with @jit and pass the underlying NumPy array of Series or DataFrame (using to_numpy() ) into the function.
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
The best solution I've found (and it only works for some types of problems) is to use a client/server setup using Python's BaseManager and SyncManager classes. To do this you first setup a Server that serve's up a proxy class for the data.
DataServer.py
#!/usr/bin/python
from multiprocessing.managers import SyncManager
import numpy
# Global for storing the data to be served
gData = {}
# Proxy class to be shared with different processes
# Don't put big data in here since that will force it to be piped to the
# other process when instantiated there, instead just return a portion of
# the global data when requested.
class DataProxy(object):
def __init__(self):
pass
def getData(self, key, default=None):
global gData
return gData.get(key, None)
if __name__ == '__main__':
port = 5000
print 'Simulate loading some data'
for i in xrange(1000):
gData[i] = numpy.random.rand(1000)
# Start the server on address(host,port)
print 'Serving data. Press <ctrl>-c to stop.'
class myManager(SyncManager): pass
myManager.register('DataProxy', DataProxy)
mgr = myManager(address=('', port), authkey='DataProxy01')
server = mgr.get_server()
server.serve_forever()
Run the above once and leave it running. Below is the client class you use to access the data.
DataClient.py
from multiprocessing.managers import BaseManager
import psutil #3rd party module for process info (not strictly required)
# Grab the shared proxy class. All methods in that class will be availble here
class DataClient(object):
def __init__(self, port):
assert self._checkForProcess('DataServer.py'), 'Must have DataServer running'
class myManager(BaseManager): pass
myManager.register('DataProxy')
self.mgr = myManager(address=('localhost', port), authkey='DataProxy01')
self.mgr.connect()
self.proxy = self.mgr.DataProxy()
# Verify the server is running (not required)
@staticmethod
def _checkForProcess(name):
for proc in psutil.process_iter():
if proc.name() == name:
return True
return False
Below is the test code to try this with multiprocessing.
TestMP.py
#!/usr/bin/python
import time
import multiprocessing as mp
import numpy
from DataClient import *
# Confusing, but the "proxy" will be global to each subprocess,
# it's not shared across all processes.
gProxy = None
gMode = None
gDummy = None
def init(port, mode):
global gProxy, gMode, gDummy
gProxy = DataClient(port).proxy
gMode = mode
gDummy = numpy.random.rand(1000) # Same as the dummy in the server
#print 'Init proxy ', id(gProxy), 'in ', mp.current_process()
def worker(key):
global gProxy, gMode, gDummy
if 0 == gMode: # get from proxy
array = gProxy.getData(key)
elif 1 == gMode: # bypass retrieve to test difference
array = gDummy
else: assert 0, 'unknown mode: %s' % gMode
for i in range(1000):
x = sum(array)
return x
if __name__ == '__main__':
port = 5000
maxkey = 1000
numpts = 100
for mode in [1, 0]:
for nprocs in [16, 1]:
if 0==mode: print 'Using client/server and %d processes' % nprocs
if 1==mode: print 'Using local data and %d processes' % nprocs
keys = [numpy.random.randint(0,maxkey) for k in xrange(numpts)]
pool = mp.Pool(nprocs, initializer=init, initargs=(port,mode))
start = time.time()
ret_data = pool.map(worker, keys, chunksize=1)
print ' took %4.3f seconds' % (time.time()-start)
pool.close()
When I run this on my machine I get...
Using local data and 16 processes
took 0.695 seconds
Using local data and 1 processes
took 5.849 seconds
Using client/server and 16 processes
took 0.811 seconds
Using client/server and 1 processes
took 5.956 seconds
Whether this works for you in your multiprocessing system depends on how often have to grab the data. There's a small overhead associated with each transfer. You can see this if you turn down the number of iterations in the x=sum(array)
loop. At some point you'll spend more time getting data than working on it.
Besides multiprocessing, I also like this pattern because I only have to load my big array data once in the server program and it stays loaded until I kill the server. That means I can run a bunch of separate scripts against the data and they execute quickly; no waiting for data to load.
While the approach here is somewhat similar to using a database, it has the advantage of working on any type of python object, not just simple DB tables of strings and ints, etc. I've found that using a DB is a bit faster for those simple types but for me, it tends to be more work programatically and my data doesn't always port over easily to a database.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With