I have the below code. It uses dask distributed to read 100 json files:(Workers: 5 Cores: 5 Memory: 50.00 GB)
from dask.distributed import Client
import dask.dataframe as dd
client = Client('xxxxxxxx:8786')
df = dd.read_json('gs://xxxxxx/2018-04-18/data-*.json')
df = client.persist(df)
When I run the code, I only see one worker takes up the read_json() task, and then I got memory error and got WorkerKilled error.
Should I manually read each file and concat them? or is dask supposed to do it under-the-hood?
You may want to use dask.bag instead of dask.dataframe
import json
import dask.bag as db
mybag = db.read_text('gs://xxxxxx/2018-04-18/data-*.json').map(json.loads)
After that you can convert the bag into a dask dataframe with
mybag.to_dataframe()
This may require some additional uses of dask.map to get the structure right.
If your data is hadoop style json (aka one object per line), the bag trick will still work but you may need to operate on individual lines.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With