Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Distributing rows amongst partitions in a Dask DataFrame

Tags:

Expectation: I would expect that, when I partition a given dataframe, the rows will be roughly evenly distributed into each partition. I would then expect that, when I write the dataframe to csv, the resulting n csvs (in this case, 10), would similarly be of roughly equal length.

Reality: When I run the below code, I find that instead of a somewhat even distribution of rows, all rows are in export_results-0.csv and the remaining 9 csvs are empty.

Question: Are there additional configurations that I need to set to ensure that rows are distributed amongst all the partitions?

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd

client = Client('tcp://10.0.0.60:8786')

df = pd.DataFrame({'geom': np.random.random(1000)}, index=np.arange(1000))
sd = dd.from_pandas(df, npartitions=100)

tall = dd.merge(sd.assign(key=0), sd.assign(key=0), on='key').drop('key', axis=1)
tall.to_csv('export_results-*.csv').compute()

About the above code: In the below code, I create a 1000 row dataframe and merge it with itself in order to create a 1000000 row long dataframe (the goal is to eventually generate a thin and tall table that holds the distance from any one to any other geometry from a list of 100k+).

like image 473
kuanb Avatar asked Jun 16 '17 20:06

kuanb


1 Answers

So, it's noted in the dataframe performance section of the Dask docs that joins between two Dask dataframes can be very expensive.

By joining a Dask dataframe to a Pandas dataframe, I seem to be able to preserve partitioning. Here's an example modification to the above code:

df1 = pd.DataFrame({ 'geom': np.random.random(200) }, index=np.arange(200))
sd1 = dd.from_pandas(df1.copy(), npartitions=5).assign(key=0)

tall = dd.merge(sd1, df1.assign(key=0), on='key', npartitions=10).drop('key', axis=1)
tall.to_csv('exported_csvs/res-*.csv')

Now, this achieves the goal of maintaining partitions. That said, I would still be interested in understanding why partitions seemingly can't be preserved when merging two Dask dataframes.

like image 159
kuanb Avatar answered Oct 12 '22 11:10

kuanb