I have a dask dataframe (df
) with around 250 million rows (from a 10Gb CSV file). I have another pandas dataframe (ndf
) of 25,000 rows. I would like to add the first column of pandas dataframe to the dask dataframe by repeating every item 10,000 times each.
Here's the code that I tried. I have reduced the problem to a smaller size.
import dask.dataframe as dd
import pandas as pd
import numpy as np
pd.DataFrame(np.random.rand(25000, 2)).to_csv("tempfile.csv")
df = dd.read_csv("tempfile.csv")
ndf = pd.DataFrame(np.random.randint(1000, 3500, size=2500))
df['Node'] = np.repeat(ndf[0], 10)
With this code, I end up with an error.
ValueError: Not all divisions are known, can't align partitions. Please use
set_index
to set the index.
I can perform a reset_index()
followed by a set_index()
to make df.known_divisions
True
for the dask dataframe. But it is a time consuming operation. Is there a better faster way to do what I am trying to do? Can I do this using pandas itself?
The end goal is to find rows from ndf
where any of the corresponding rows from df
matches some criteria.
Dask DataFrame is used in situations where pandas is commonly needed, usually when pandas fails due to data size or computation speed: Manipulating large datasets, even when those datasets don't fit in memory.
Concatenating DataFrames with different schemas Let's create two DataFrames with different schemas, concatenate them, and see how Dask behaves. Start by creating the two DataFrames. Concatenate the DataFrames and print the result. Dask fills in the missing values with NaN to make the concatenation possible.
When concatenating datasets vertically, assuming the dataframes have the same column names and the order of the columns is the same, we can simply use the pandas. concat() method to perform the concatenation.
Your basic algorithm is "I'd like the first 10 values of df['Node']
to be set to the first value of ndf
, the next 10 values to the next value of ndf
, and so on". The reason this is hard in Dask, is because it doesn't know how many rows are in each partition: you are reading from CSVs, and the number of rows you get in X bytes depends on exactly what the data are like in each part. Other formats give you more information...
You will, therefore, certainly need two passes through the data. You could work with the index, to figure out divisions and potentially do some sorting. To my mind, the easiest thing you can do is simply to measure the division lengths, and so get the offset of the start of each:
lengths = df.map_partitions(len).compute()
offsets = np.cumsum(lengths.values)
offsets -= offsets[0]
and now use custom delayed function to work on the parts
@dask.delayed
def add_node(part, offset, ndf):
index = pd.Series(range(offset, offset + len(part)) // 10,
index=part.index) # 10 is the repeat factor
part['Node'] = index.map(ndf)
return part
df2 = dd.from_delayed([add_node(d, off, ndf)
for d, off in zip(df.to_delayed(), offsets)])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With