I'm playing with some github user data and was trying to create a graph of all people in the same city. To do this i need to use the merge operation in dask. Unfortunately the github user base size is 6M and it seems that the merge operation is causing the resulting dataframe to blow up. I used the following code
import dask.dataframe as dd
gh = dd.read_hdf('data/github.hd5', '/github', chunksize=5000, columns=['id', 'city']).dropna()
st = dd.read_hdf('data/github.hd5', '/github', chunksize=5000, columns=['id', 'city']).dropna()
mrg = gh.merge(st, on='city').drop('city', axis=1)
mrg['max'] = mrg.max(axis=1)
mrg['min'] = mrg.min(axis=1)
mrg.to_castra('github')
I can merge on other criteria such as name/username using this code but i get MemoryError when i try and run the above code.
I have tried running this using sync/multiprocessing and threaded schedulers.
I'm trying to do this on a Dell Laptop i7 4core with 8GB RAM. Shouldn't dask to this operation in a chunked manner or am I getting this wrong? Is writing the code using pandas dataframe iterators the only way out?
Dask Dataframe Merge You can join a Dask DataFrame to a small pandas DataFrame by using the dask. dataframe. merge() method, similar to the pandas api. To join two large Dask DataFrames, you can use the exact same Python syntax.
Using dask instead of pandas to merge large data sets The python package dask is a powerful python package that allows you to do data analytics in parallel which means it should be faster and more memory efficient than pandas .
TL;DR: unmanaged memory is RAM that the Dask scheduler is not directly aware of and which can cause workers to run out of memory and cause computations to hang and crash.
Lazy Evaluation. Most Dask Collections, including Dask DataFrame are evaluated lazily, which means Dask constructs the logic (called task graph) of your computation immediately but “evaluates” them only when necessary.
Castra isn't supported anymore, so using HDF is recommended. From the comments, writing to multiple files using to_hdf()
solved the memory error:
mrg.to_hdf('github-*.hdf')
Relevant documentation: https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.to_hdf.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With