I have a dask dataframe
grouped by the index (first_name
).
import pandas as pd
import numpy as np
from multiprocessing import cpu_count
from dask import dataframe as dd
from dask.multiprocessing import get
from dask.distributed import Client
NCORES = cpu_count()
client = Client()
entities = pd.DataFrame({'first_name':['Jake','John','Danae','Beatriz', 'Jacke', 'Jon'],'last_name': ['Del Toro', 'Foster', 'Smith', 'Patterson', 'Toro', 'Froster'], 'ID':['X','U','X','Y', '12','13']})
df = dd.from_pandas(entities, npartitions=NCORES)
df = client.persist(df.set_index('first_name'))
(Obviously entities
in the real life is several thousand rows)
I want to apply a user defined function to each grouped dataframe. I want to compare each row with all the other rows in the group (something similar to Pandas compare each row with all rows in data frame and save results in list for each row).
The following is the function that I try to apply:
def contraster(x, DF):
matches = DF.apply(lambda row: fuzz.partial_ratio(row['last_name'], x) >= 50, axis = 1)
return [i for i, x in enumerate(matches) if x]
For the test entities
data frame, you could apply the function as usual:
entities.apply(lambda row: contraster(row['last_name'], entities), axis =1)
And the expected result is:
Out[35]:
0 [0, 4]
1 [1, 5]
2 [2]
3 [3]
4 [0, 4]
5 [1, 5]
dtype: object
When entities
is huge, the solution is use dask
. Note that DF
in the contraster
function must be the groupped dataframe.
I am trying to use the following:
df.groupby('first_name').apply(func=contraster, args=????)
But How should I specify the grouped dataframe (i.e. DF
in contraster
?)
You can use pandas' apply() function to apply any in-built or custom Python function across a pandas one-dimensional array, i.e., a Series or a single dimension of a DataFrame. This can be applied across columns (axis=0), or rows (axis=1). This computation takes ~10s because we have 10 rows.
Just like Pandas, Dask DataFrame supports label-based indexing with the . loc accessor for selecting rows or columns, and __getitem__ (square brackets) for selecting just columns. To select rows, the DataFrame's divisions must be known (see Internal Design and Dask DataFrames Best Practices for more information.)
The npartitions property is the number of Pandas dataframes that compose a single Dask dataframe. This affects performance in two main ways. If you don't have enough partitions then you may not be able to use all of your cores effectively. For example if your dask.
The function you provide to groupby-apply should take a Pandas dataframe or series as input and ideally return one (or a scalar value) as output. Extra parameters are fine, but they should be secondary, not the first argument. This is the same in both Pandas and Dask dataframe.
def func(df, x=None):
# do whatever you want here
# the input to this function will have all the same first name
return pd.DataFrame({'x': [x] * len(df),
'count': len(df),
'first_name': df.first_name})
You can then call df.groupby as normal
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'first_name':['Alice', 'Alice', 'Bob'],
'last_name': ['Adams', 'Jones', 'Smith']})
ddf = dd.from_pandas(df, npartitions=2)
ddf.groupby('first_name').apply(func, x=3).compute()
This will produce the same output in either pandas or dask.dataframe
count first_name x
0 2 Alice 3
1 2 Alice 3
2 1 Bob 3
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With