I'm reading in some gzipped data from s3, using dask (a replacement for a SQL query). However, it looks like there is some caching of the data file, or unzipped file somewhere that keeps in system memory. NB this should be runnable, the test data here is used from the pandas test suite in a public s3 bucket.
import dask.dataframe as dd
import pandas as pd
import psutil as ps
import os
#for easier vis
mb = 1048576
def mytestfunc(file):
process = ps.Process(os.getpid())
print('initial memory: {0}'.format(process.memory_info().rss/mb))
data = dd.read_csv(file, compression = 'gzip', blocksize = None, storage_options = {'anon':True})
print('dask plan memory: {0}'.format(process.memory_info().rss/mb))
data = data.compute()
print('data in memory: {0}'.format(process.memory_info().rss/mb))
print('data frame usage: {0}'.format(data.memory_usage(deep=True).sum()/mb))
return data
process = ps.Process(os.getpid())
print('before function call: {0}'.format(process.memory_info().rss/mb))
out = mytestfunc('s3://pandas-test/large_random.csv.gz')
print('After function call: {0}'.format(process.memory_info().rss/mb))
# out = mytestfunc('s3://pandas-test/tips.csv.gz')
# print('After smaller function call: {0}'.format(process.memory_info().rss/mb))
Which gives me:
before function call: 76.984375
initial memory: 76.984375
dask plan memory: 92.9921875
data in memory: 224.71484375
data frame usage: 38.14704895019531
After function call: 224.7265625
Naively, I would expect the 'after function call' to be the 'before function call' plus the dataframe and a bit of overhead. Here, the gzip is 43mb, and results in an overhead of about 90mb, in my real example, this extra part is about 50gb of extra memory for a 10gb dataframe.
You can see that the memory is freed up if you rerun on another, smaller file - uncomment the rerun on the smaller file to see it. This also shows that the increase is due to the file size - you can switch the order and run 'tips' first and the memory stays at ~90mb.
I am guessing dask, s3fs or pandas is holding the file or the unzipped contents in a buffer somewhere, but I haven't been able to track it down to clear it.
Any ideas on how to reduce this memory use, or free the buffer?
EDIT: An example of the above output for some of my real data - 32 gzipped files:
before function call: 70.69921875
initial memory: 70.69921875
dask plan memory: 80.16015625
data in memory: 33991.69921875
data frame usage: 10824.553115844727
After function call: 33991.69921875
I understand dask will have a higher peak memory usage than a pandas loop over the same 32 files, but I still don't get why it doesn't get freed up.
The chunked version uses the least memory, but wallclock time isn't much better. The Dask version uses far less memory than the naive version, and finishes fastest (assuming you have CPUs to spare).
The original pandas query took 182 seconds and the optimized Dask query took 19 seconds, which is about 10 times faster. Dask can provide performance boosts over pandas because it can execute common operations in parallel, where pandas is limited to a single core.
Dask can enable efficient parallel computations on single machines by leveraging their multi-core CPUs and streaming data efficiently from disk. It can run on a distributed cluster. Dask also allows the user to replace clusters with a single-machine scheduler which would bring down the overhead.
When using pandas.read_csv
in threads it appears that the Python process leaks a little memory. I've reduced it to a problem with pandas.read_csv
and a concurrent.futures.ThreadPoolExecutor
. This is raised on the Pandas issue tracker here: https://github.com/pandas-dev/pandas/issues/19941
# imports
import pandas as pd
import numpy as np
import time
import psutil
from concurrent.futures import ThreadPoolExecutor
# prep
process = psutil.Process()
e = ThreadPoolExecutor(8)
# prepare csv file, only need to run once
pd.DataFrame(np.random.random((100000, 50))).to_csv('large_random.csv')
# baseline computation making pandas dataframes with threasds. This works fine
def f(_):
return pd.DataFrame(np.random.random((1000000, 50)))
print('before:', process.memory_info().rss // 1e6, 'MB')
list(e.map(f, range(8)))
time.sleep(1) # let things settle
print('after:', process.memory_info().rss // 1e6, 'MB')
# before: 57.0 MB
# after: 56.0 MB
# example with read_csv, this leaks memory
print('before:', process.memory_info().rss // 1e6, 'MB')
list(e.map(pd.read_csv, ['large_random.csv'] * 8))
time.sleep(1) # let things settle
print('after:', process.memory_info().rss // 1e6, 'MB')
# before: 58.0 MB
# after: 323.0 MB
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With