I'm processing a data set using Dask (considering it doesn't fit in memory) and I want to group the instances with a different aggregating function depending on the column and it's type.
Dask has a set of default aggregation functions for numerical data types, but not for strings/objects. Is there a way to implement a user defined aggregation function for strings somewhat similar to the example below?
atts_to_group = {'A', 'B'}
agg_fn = {
  'C': 'mean'  #int
  'D': 'concatenate_fn1'  #string - No default fn for strings - Doesn't work
  'E': 'concatenate_fn2'  #string
}
ddf = ddf.groupby(atts_to_group).agg(agg_fn).compute().reset_index()
At this point I'm able to read the whole data set in memory upon dropping irrelevant columns/rows, but I'd prefer continuing the processing in Dask considering it's faster performing the required operations.
Edit: Tried adding a custom function directly onto the dictionary:
def custom_concat(df):
    ...
    return df_concatd
agg_fn = {
  'C': 'mean'  #int
  'D': custom_concat(df)
}
-------------------------------------------------------
ValueError: unknown aggregate Dask DataFrame Structure:
Using dask instead of pandas to merge large data sets The python package dask is a powerful python package that allows you to do data analytics in parallel which means it should be faster and more memory efficient than pandas .
This allows partitionwise slicing of a Dask Dataframe. You can perform normal Numpy-style slicing but now rather than slice elements of the array you slice along partitions so, for example, df. partitions[:5] produces a new Dask Dataframe of the first five partitions.
Function to use for aggregating the data. If a function, must either work when passed a DataFrame or when passed to DataFrame. apply. For a DataFrame, can pass a dict, if the keys are DataFrame column names.
The Group By statement is used to group together any rows of a column with the same value stored in them, based on a function specified in the statement. Generally, these functions are one of the aggregate functions such as MAX() and SUM(). This statement is used with the SELECT command in SQL.
Realised Dask provides with an Aggregation data structure. The custom aggregation can be done as follows:
# Concatenates the strings and separates them using ","
custom_concat = dd.Aggregation('custom_sum', lambda x: ",".join(str(x)), lambda x0: ",".join(str(x0)))
custom_concat_E = ...
atts_to_group = {'A', 'B'}
agg_fn = {
  'C': 'mean'  #int
  'D': custom_concat_D
  'E': custom_concat_E
}
ddf = ddf.groupby(atts_to_group).agg(agg_fn).compute().reset_index()
This can also be done with Dataframe.apply for a less verbose solution
def agg_fn(x):
    return pd.Series(
        dict(
            C = x['C'].mean(), # int
            D = "{%s}" % ', '.join(x['D']), # string (concat strings)
            E = ...
        )
    )
ddf = ddf.groupby(atts_to_group).apply(agg_fn).compute().reset_index
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With