Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Applying Python function to Pandas grouped DataFrame - what's the most efficient approach to speed up the computations?

I'm dealing with quite large Pandas DataFrame - my dataset resembles a following df setup :

import pandas as pd
import numpy  as np

#--------------------------------------------- SIZING PARAMETERS :
R1 =                    20        # .repeat( repeats = R1 )
R2 =                    10        # .repeat( repeats = R2 )
R3 =                541680        # .repeat( repeats = [ R3, R4 ] )
R4 =                576720        # .repeat( repeats = [ R3, R4 ] )
T  =                 55920        # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used

#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
         { 'measurement_id':        np.repeat( [0, 1], repeats = [ R3, R4 ] ), 
           'time':np.concatenate( [ np.repeat( A1,     repeats = R1 ),
                                    np.repeat( A2,     repeats = R1 ) ] ), 
           'group':        np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
           'object':       np.tile( np.arange( 0, R1 ),                T )
           }
        )

#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
                  df                                                  \
                    .groupby( ['measurement_id', 'time', 'group'] )    \
                    .apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
                    .explode()                                           \
                    .astype( 'float' )                                    \
                    .to_frame( 'var' )                                     \
                    .reset_index( drop = True )
                  ], axis = 1
                )

Note: For the purpose of having a minimal example, it can be easily subsetted (for example with df.loc[df['time'] <= 400, :]), but since I simulate the data anyway I thought that original size would give a better overview.

For each group defined by ['measurement_id', 'time', 'group'] I need to call the following function:

from sklearn.cluster import SpectralClustering
from pandarallel     import pandarallel

def cluster( x, index ):
    if len( x ) >= 2:
        data = np.asarray( x )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.Series( clustering.labels_ + 1, index = index )
    else:
        return pd.Series( np.nan, index = index )

To enhance the performance I tried two approaches:

Pandarallel package

First approach was to parallelise the computations using pandarallel package:

pandarallel.initialize( progress_bar = True )
df \
  .groupby( ['measurement_id', 'time', 'group'] ) \
  .parallel_apply( lambda x: cluster( x['var'], x['object'] ) )

However, this seems to be sub-optimal as it consumes a lot of RAM and not all cores are used in computations ( even despite specifying the number of cores explicitly in the pandarallel.initialize() method ). Also, sometimes computations are terminated with various errors, although I have not had a chance to find a reason for that ( possibly a lack of RAM? ).

PySpark Pandas UDF

I also gave a Spark Pandas UDF a go, although I am totally new to Spark. Here's my attempt:

import findspark;  findspark.init()

from pyspark.sql           import SparkSession
from pyspark.conf          import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types     import *

spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )

@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
    if len( df['var'] ) >= 2:
        data = np.asarray( df['var'] )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.DataFrame( clustering.labels_ + 1,
                             index = df['object']
                             )
    else:
        return pd.DataFrame( np.nan,
                             index = df['object']
                             )

res = df                                           \
        .groupBy( ['id_half', 'frame', 'team_id'] ) \
        .apply( cluster )                            \
        .toPandas()

Unfortunately, the performance was unsatisfactory as well, and from what I read on the topic, this may be just the burden of using UDF function, written in Python and the associated need of converting all Python objects to Spark objects and back.

So here are my questions:

  1. Could either of my approaches be adjusted to eliminate possible bottlenecks and improve the performance? (e.g. PySpark setup, adjusting sub-optimal operations etc.)
  2. Are they any better alternatives? How do they compare to the provided solutions in terms of performance?
like image 248
Kuba_ Avatar asked Feb 24 '20 11:02

Kuba_


1 Answers

Q : "Could either of my approaches be adjusted to eliminate possible bottlenecks and improve the performance? ( e.g. PySpark setup, adjusting sub-optimal operations etc. )"

+1 for mentioning the setup add-on overhead costs for either strategy of computing. This always makes a break-even point, only after which a non-[SERIAL] strategy may achieve any beneficial joy of some wished-to-have [TIME]-Domain speedup ( yet, if other, typically [SPACE]-Domain costs permit or stay feasible - yes, RAM ... existence of & access to such a sized device, budget and other similar real-world constraints )

First,
the pre-flight check, before we take-off

The new, overhead-strict formulation of Amdahl's Law is currently able to incorporate both of these add-on pSO + pTO overheads and reflects these in predicting the achievable Speedup-levels including the break-even point, since which it may become meaningful ( in a costs/effect, efficiency sense ) to go parallel.

enter image description here

Yet,
that is not our core problem here.
This comes next :

Next,
given the computational costs of SpectralClustering(), which is going here to use the Radial Boltzmann Function kernel ~ exp( -gamma * distance( data, data )**2 ) there seems to be no advance from split of data-object over any number of disjunct work-units, as the distance( data, data )-component, by definition, has but to visit all the data-elements ( ref. the communication costs of any-to-any value-passing { process | node }-distributed topologies are, for obvious reasons, awfully bad if not the worst use-cases for { process | node }-distributed processing, if not the straight anti-patterns ( except for some indeed arcane, memory-less / state-less, yet computing fabrics ).

For pedantic analysts, yes - add to this ( and we may already say a bad state ) the costs of -again- any-to-any k-means-processing, here about O( N^( 1 + 5 * 5 ) ) that goes, for N ~ len( data ) ~ 1.12E6+, awfully against our wish to have some smart and fast processing.

So what?

While the setup costs are not neglected, the increased communication costs will almost for sure disable any improvement from using the above sketched attempts to move from a pure-[SERIAL] process flow into some form of just-[CONCURRENT] or True-[PARALLEL] orchestration of some work-sub-units, due to increased overheads related to a must to implement ( a tandem pair of ) any-to-any value-passing topologies.

If it weren't for 'em?

Well, this sounds as a Computing Science oxymoron - even if it were possible, the costs of the any-to-any pre-computed distances ( which would take those immense [TIME]-Domain complexity costs "beforehand" ( Where? How? Is there any other, un-avoidable latency, permitting a possible latency masking by some ( unknown so far ) incremental buildup of a complete-in-future any-to-any distance matrix? ) ) would but reposition these principally present costs to some other location in [TIME]- and [SPACE]-Domains, not reduce 'em.

Q : "Are they any better alternatives?"

The only one, I am aware off so far, is to try, if the problem is possible to get re-formulated into another, a QUBO-formulated, problem fashion ( ref.: Quantum-Unconstrained-Binary-Optimisation, good news is that tools for doing so, a base of first-hand knowledge and practical problem-solving experience exist and grow larger )

Q : How do they compare to the provided solutions in terms of performance?

The performance is breathtaking - QUBO-formulated problem has a promising O(1) (!) solver in constant time ( in [TIME]-Domain ) and somewhat restricted in [SPACE]-Domain ( where recently announced LLNL tricks may help avoid this physical world, current QPU implementation, constraint of problem sizes ).

like image 58
user3666197 Avatar answered Sep 17 '22 12:09

user3666197