I'm new on Dask and I'm finding it quite useful, but I have a problem that I haven't been able to solve yet.
I have a data set larger than memory, and I want to remove duplicate values from a column.
The problem is that after this removal the data set will still remain larger than the memory. Therefore, the result needs to be calculated through files and saved directly to the disk.
Of course, I can build a code to manually do this removal, but I was wondering if Dask already has this implemented.
This is my code:
from dask.distributed import Client
import dask.dataframe as dd
client = Client(memory_limit='8GB') # I've tried without this limit
data = dd.read_csv("path_to_file", dtype={
'id': 'Int64'
}, sample=1000)
data.drop_duplicates(subset=['text'])
results = data.compute() # <- Here is the problem
results.to_csv("pathout", index=False)
When I call the compute, the result is a DataFrame pandas, which in this case, is larger than the memory. I'm receiving a lot of:
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
and then execution fail with "KilledWorker"
EDIT:
Self contained example:
import numpy as np
import pandas as pd
from dask.distributed import Client
import dask.dataframe as dd
# Creates about 2Gb of data
data = np.random.randint(0, 10000, size=(2000000,200))
pd.DataFrame(data).to_csv('test_in.csv', index=False)
# If you want to run on terminal, uncomment the next line and identy the rest of the code
# if __name__ == '__main__':
# To test, limit dask to 1Gb
client = Client(n_workers=1, memory_limit='1GB')
df = dd.read_csv('test_in.csv', blocksize='16MB')
results = df.drop_duplicates()
results.to_csv('test_out.csv', index=False)
client.close()
from dask.distributed import Client
import dask.dataframe as dd
client = Client(memory_limit='8GB')
data = dd.read_csv("path_to_file", dtype={'id': 'Int64'}, sample=1000)
results = data.drop_duplicates(subset=['A']) # Don't call compute here
results.to_csv("pathout", index=False) # Write operations automatically call compute
.compute() will return a Pandas dataframe and from there Dask is gone. You can use the .to_csv() function from Dask and it will save a file for each partition.
Just remove the .compute() and it will work if every partition fits into memory.
Oh and you need the assign the result of .drop_duplicates().
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With