Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert Dask Bag of Pandas DataFrames to a single Dask DataFrame

Summary of Problem

Short Version

How do I go from a Dask Bag of Pandas DataFrames, to a single Dask DataFrame?

Long Version

I have a number of files that are not readable by any of dask.dataframe's various read functions (e.g. dd.read_csv or dd.read_parquet). I do have my own function that will read them in as Pandas DataFrames (function only works on one file at a time, akin to pd.read_csv). I would like to have all of these single Pandas DataFrames in one large Dask DataFrame.

Minimum Working Example

Here's some example CSV data (my data isn't actually in CSVs, but using it here for ease of example). To create a minimum working example, you can save this as a CSV and make a few copies, then use the code below

"gender","race/ethnicity","parental level of education","lunch","test preparation course","math score","reading score","writing score"
"female","group B","bachelor's degree","standard","none","72","72","74"
"female","group C","some college","standard","completed","69","90","88"
"female","group B","master's degree","standard","none","90","95","93"
"male","group A","associate's degree","free/reduced","none","47","57","44"
"male","group C","some college","standard","none","76","78","75"
from glob import glob
import pandas as pd
import dask.bag as db

files = glob('/path/to/your/csvs/*.csv')
bag = db.from_sequence(files).map(pd.read_csv)

What I've tried so far

import pandas as pd
import dask.bag as db
import dask.dataframe as dd

# Create a Dask bag of pandas dataframes
bag = db.from_sequence(list_of_files).map(my_reader_function)

df = bag.map(lambda x: x.to_records()).to_dataframe() # this doesn't work
df = bag.map(lambda x: x.to_dict(orient = <any option>)).to_dataframe() # neither does this

# This gets me really close. It's a bag of Dask DataFrames. 
# But I can't figure out how to concatenate them together
df = bag.map(dd.from_pandas, npartitions = 1)

df = dd.from_delayed(bag) # returns an error
like image 449
natemcintosh Avatar asked Oct 25 '25 16:10

natemcintosh


2 Answers

I recommend using dask.delayed with dask.dataframe. There is a good example doing what you want to do here:

https://docs.dask.org/en/latest/delayed-collections.html

like image 81
MRocklin Avatar answered Oct 27 '25 06:10

MRocklin


Here are two additional possible solutions:

1. Convert the bag to a list of dataframes then use dd.multi.concat:

bag #a dask bag of dataframes
list_of_dfs = bag.compute()
df = dd.multi.concat(list_of_dfs).compute()

2. Convert to a bag of dictionaries and use bag.to_dataframe:

bag_of_dicts = bag.map(lambda df: df.to_dict(orient='records')).flatten()
df = bag_of_dicts.to_dataframe().compute()

In my own specific use case, option #2 had better performance than option #1.

like image 38
Kevin Taylor Avatar answered Oct 27 '25 06:10

Kevin Taylor