Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

sampling with weight using pyspark

I have an unbalanced dataframe on spark using PySpark. I want to resample it to make it balanced. I only find the sample function in PySpark

sample(withReplacement, fraction, seed=None)

but I want to sample the dataframe with weight of unitvolume in Python, I can do it like

df.sample(n,Flase,weights=log(unitvolume))

is there any method I could do the same using PySpark?

like image 802
Xin Chang Avatar asked Oct 16 '22 23:10

Xin Chang


1 Answers

Spark provides tools for stratified sampling, but this work only on categorical data. You could try to bucketize it:

from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import col, log

df_log = df.withColumn("log_unitvolume", log(col("unitvolume"))
splits = ... # A list of splits

bucketizer = Bucketizer(splits=splits, inputCol="log_unitvolume", outputCol="bucketed_log_unitvolume")

df_log_bucketed = bucketizer.transform(df_log)

Compute statistics:

counts = df.groupBy("bucketed_log_unitvolume")
fractions  = ...  # Define fractions from each bucket:

and use these for sampling:

df_log_bucketed.sampleBy("bucketed_log_unitvolume", fractions)

You can also try to rescale log_unitvolume to [0, 1] range and then:

from pyspark.sql.functions import rand 

df_log_rescaled.where(col("log_unitvolume_rescaled") < rand())
like image 131
Alper t. Turker Avatar answered Oct 23 '22 00:10

Alper t. Turker