there is something that I quite don't understand about dask.dataframe
behavior. Let say I want to replicate this from pandas
import pandas as pd
import dask.dataframe as dd
import random
s = "abcd"
lst = 10*[0]+list(range(1,6))
n = 100
df = pd.DataFrame({"col1": [random.choice(s) for i in range(n)],
"col2": [random.choice(lst) for i in range(n)]})
# I will need an hash in dask
df["hash"] = 2*df.col1
df = df[["hash","col1","col2"]]
def fun(data):
if data["col2"].mean()>1:
data["col3"]=2
else:
data["col3"]=1
return(data)
df1 = df.groupby("col1").apply(fun)
df1.head()
this returns
hash col1 col2 col3
0 dd d 0 1
1 aa a 0 2
2 bb b 0 1
3 bb b 0 1
4 aa a 0 2
In Dask I tried
def fun2(data):
if data["col2"].mean()>1:
return 2
else:
return 1
ddf = df.copy()
ddf.set_index("hash",inplace=True)
ddf = dd.from_pandas(ddf, npartitions=2)
gpb = ddf.groupby("col1").apply(fun2, meta=pd.Series())
where the groupby lead to the same result as in pandas but I'm having hard time merging the result on a new column preserving the hash index. I'd like to have the following result
col1 col2 col3
hash
aa a 5 2
aa a 0 2
aa a 0 2
aa a 0 2
aa a 4 2
UPDATE
Playing with merge I found this solution
ddf1 = dd.merge(ddf, gpb.to_frame(),
left_on="col1",
left_index=False, right_index=True)
ddf1 = ddf1.rename(columns={0:"col3"})
I'm not sure how this is going to work if I have to a groupby over several columns. Plus is not exactly elegant.
How about using join?
This is your dask code with the exception of naming the Series pd.Series(name='col3')
def fun2(data):
if data["col2"].mean()>1:
return 2
else:
return 1
ddf = df.copy()
ddf.set_index("hash",inplace=True)
ddf = dd.from_pandas(ddf, npartitions=2)
gpb = ddf.groupby("col1").apply(fun2, meta=pd.Series(name='col3'))
then the join
ddf.join(gpb.to_frame(), on='col1')
print(ddf.compute().head())
col1 col2 col3
hash
cc c 0 2
cc c 0 2
cc c 0 2
cc c 2 2
cc c 0 2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With