I'm dealing with quite large Pandas DataFrame - my dataset resembles a following df
setup :
import pandas as pd
import numpy as np
#--------------------------------------------- SIZING PARAMETERS :
R1 = 20 # .repeat( repeats = R1 )
R2 = 10 # .repeat( repeats = R2 )
R3 = 541680 # .repeat( repeats = [ R3, R4 ] )
R4 = 576720 # .repeat( repeats = [ R3, R4 ] )
T = 55920 # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used
#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
{ 'measurement_id': np.repeat( [0, 1], repeats = [ R3, R4 ] ),
'time':np.concatenate( [ np.repeat( A1, repeats = R1 ),
np.repeat( A2, repeats = R1 ) ] ),
'group': np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
'object': np.tile( np.arange( 0, R1 ), T )
}
)
#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
.explode() \
.astype( 'float' ) \
.to_frame( 'var' ) \
.reset_index( drop = True )
], axis = 1
)
Note: For the purpose of having a minimal example, it can be easily subsetted (for example with df.loc[df['time'] <= 400, :]
), but since I simulate the data anyway I thought that original size would give a better overview.
For each group defined by ['measurement_id', 'time', 'group']
I need to call the following function:
from sklearn.cluster import SpectralClustering
from pandarallel import pandarallel
def cluster( x, index ):
if len( x ) >= 2:
data = np.asarray( x )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.Series( clustering.labels_ + 1, index = index )
else:
return pd.Series( np.nan, index = index )
To enhance the performance I tried two approaches:
First approach was to parallelise the computations using pandarallel
package:
pandarallel.initialize( progress_bar = True )
df \
.groupby( ['measurement_id', 'time', 'group'] ) \
.parallel_apply( lambda x: cluster( x['var'], x['object'] ) )
However, this seems to be sub-optimal as it consumes a lot of RAM and not all cores are used in computations ( even despite specifying the number of cores explicitly in the pandarallel.initialize()
method ). Also, sometimes computations are terminated with various errors, although I have not had a chance to find a reason for that ( possibly a lack of RAM? ).
I also gave a Spark Pandas UDF a go, although I am totally new to Spark. Here's my attempt:
import findspark; findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )
@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
if len( df['var'] ) >= 2:
data = np.asarray( df['var'] )[:, np.newaxis]
clustering = SpectralClustering( n_clusters = 5,
random_state = 42
).fit( data )
return pd.DataFrame( clustering.labels_ + 1,
index = df['object']
)
else:
return pd.DataFrame( np.nan,
index = df['object']
)
res = df \
.groupBy( ['id_half', 'frame', 'team_id'] ) \
.apply( cluster ) \
.toPandas()
Unfortunately, the performance was unsatisfactory as well, and from what I read on the topic, this may be just the burden of using UDF function, written in Python and the associated need of converting all Python objects to Spark objects and back.
So here are my questions:
Q : "Could either of my approaches be adjusted to eliminate possible bottlenecks and improve the performance? ( e.g. PySpark setup, adjusting sub-optimal operations etc. )"
+1
for mentioning the setup add-on overhead costs for either strategy of computing. This always makes a break-even point, only after which a non-[SERIAL]
strategy may achieve any beneficial joy of some wished-to-have [TIME]
-Domain speedup ( yet, if other, typically [SPACE]
-Domain costs permit or stay feasible - yes, RAM ... existence of & access to such a sized device, budget and other similar real-world constraints )
First,
the pre-flight check, before we take-off
The new, overhead-strict formulation of Amdahl's Law is currently able to incorporate both of these add-on pSO + pTO
overheads and reflects these in predicting the achievable Speedup-levels including the break-even point, since which it may become meaningful ( in a costs/effect, efficiency sense ) to go parallel.
Yet,
that is not our core problem here.
This comes next :
Next,
given the computational costs of SpectralClustering()
, which is going here to use the Radial Boltzmann Function kernel ~ exp( -gamma * distance( data, data )**2 )
there seems to be no advance from split of data
-object over any number of disjunct work-units, as the distance( data, data )
-component, by definition, has but to visit all the data
-elements ( ref. the communication costs of any-to-any value-passing { process | node }
-distributed topologies are, for obvious reasons, awfully bad if not the worst use-cases for { process | node }
-distributed processing, if not the straight anti-patterns ( except for some indeed arcane, memory-less / state-less, yet computing fabrics ).
For pedantic analysts, yes - add to this ( and we may already say a bad state ) the costs of -again- any-to-any k-means-processing, here about O( N^( 1 + 5 * 5 ) )
that goes, for N ~ len( data ) ~ 1.12E6+
, awfully against our wish to have some smart and fast processing.
So what?
While the setup costs are not neglected, the increased communication costs will almost for sure disable any improvement from using the above sketched attempts to move from a pure-[SERIAL]
process flow into some form of just-[CONCURRENT]
or True-[PARALLEL]
orchestration of some work-sub-units, due to increased overheads related to a must to implement ( a tandem pair of ) any-to-any value-passing topologies.
If it weren't for 'em?
Well, this sounds as a Computing Science oxymoron - even if it were possible, the costs of the any-to-any pre-computed distances ( which would take those immense [TIME]
-Domain complexity costs "beforehand" ( Where? How? Is there any other, un-avoidable latency, permitting a possible latency masking by some ( unknown so far ) incremental buildup of a complete-in-future any-to-any distance matrix? ) ) would but reposition these principally present costs to some other location in [TIME]
- and [SPACE]
-Domains, not reduce 'em.
Q : "Are they any better alternatives?"
The only one, I am aware off so far, is to try, if the problem is possible to get re-formulated into another, a QUBO-formulated, problem fashion ( ref.: Quantum-Unconstrained-Binary-Optimisation, good news is that tools for doing so, a base of first-hand knowledge and practical problem-solving experience exist and grow larger )
Q : How do they compare to the provided solutions in terms of performance?
The performance is breathtaking - QUBO-formulated problem has a promising O(1)
(!) solver in constant time ( in [TIME]
-Domain ) and somewhat restricted in [SPACE]
-Domain ( where recently announced LLNL tricks may help avoid this physical world, current QPU implementation, constraint of problem sizes ).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With