I am struggling to convert a dask.bag
of dictionaries into dask.delayed
pandas.DataFrames
into a final dask.dataframe
I have one function (make_dict) that reads files into a rather complex nested dictionary structure and another function (make_df) to turn these dictionaries into a pandas.DataFrame
(resulting dataframe is around 100 mb for each file). I would like to append all dataframes into a single dask.dataframe
for further analysis.
Up to now I was using dask.delayed
objects to load, convert and append all data which works fine (see example below). However for future work I would like to store the loaded dictionaries in a dask.bag
using dask.persist()
.
I managed to load the data into dask.bag
, resulting in a list of dicts or list of pandas.DataFrame
that I can use locally after calling compute()
. When I tried turning the dask.bag
into a dask.dataframe
using to_delayed()
however, I got stuck with an error (see below).
It feels like I am missing something rather simple here or maybe my approach to dask.bag
is wrong?
The below example shows my approach using simplified functions and throws the same error. Any advice on how to tackle this is appreciated.
import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag
print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2
def make_dict(n=1):
return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}
def make_df(d):
return pd.DataFrame(d['data'])
k = [1,2,3]
# using dask.delayed
dfs = []
for n in k:
delayed_1 = dask.delayed(make_dict)(n)
delayed_2 = dask.delayed(make_df)(delayed_1)
dfs.append(delayed_2)
ddf1 = dask.dataframe.from_delayed(dfs).compute() # this works as expected
# using dask.bag and turning bag of dicts into bag of DataFrames
b1 = dask.bag.from_sequence(k).map(make_dict)
b2 = b1.map(make_df)
df = pd.DataFrame().append(b2.compute()) # <- I would like to do this using delayed dask.DataFrames like above
ddf2 = dask.dataframe.from_delayed(b2.to_delayed()).compute() # <- this fails
# error:
# ValueError: Expected iterable of tuples of (name, dtype), got [ A B
# 0 0 0]
what I ultimately would like to do using the distributed scheduler:
b = dask.bag.from_sequence(k).map(make_dict)
b = b.persist()
ddf = dask.dataframe.from_delayed(b.map(make_df).to_delayed())
In the bag case the delayed objects point to lists of elements, so you have a list of lists of pandas dataframes, which is not quite what you want. Two recommendations
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With