Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is sample_n really a random sample when used with sparklyr?

I have 500 million rows in a spark dataframe. I'm interested in using sample_n from dplyr because it will allow me to explicitly specify the sample size I want. If I were to use sparklyr::sdf_sample(), I would first have to calculate the sdf_nrow(), then create the specified fraction of data sample_size / nrow, then pass this fraction to sdf_sample. This isn't a big deal, but the sdf_nrow() can take a while to complete.

So, it would be ideal to use dplyr::sample_n() directly. However, after some testing, it doesn't look like sample_n() is random. In fact, the results are identical to head()! It would be a major issue if instead of sampling rows at random, the function were just returning the first n rows.

Can anyone else confirm this? Is sdf_sample() my best option?

# install.packages("gapminder")

library(gapminder)
library(sparklyr)
library(purrr)

sc <- spark_connect(master = "yarn-client")

spark_data <- sdf_import(gapminder, sc, "gapminder")


> # Appears to be random
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    58.83397


> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    60.31693


> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    59.38692
> 
> 
> # Appears to be random
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    60.48903


> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    59.44187


> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    59.27986
> 
> 
> # Does not appear to be random
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    57.78434


> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    57.78434


> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source:   lazy query [?? x 1]
# Database: spark_connection
  sample_mean
        <dbl>
1    57.78434
> 
> 
> 
> # === Test sample_n() ===
> sample_mean <- list()
> 
> for(i in 1:20){
+   
+   sample_mean[i] <- spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp)) %>% collect() %>% pull()
+   
+ }
> 
> 
> sample_mean %>% flatten_dbl() %>% mean()
[1] 57.78434
> sample_mean %>% flatten_dbl() %>% sd()
[1] 0
> 
> 
> # === Test head() ===
> spark_data %>% 
+   head(300) %>% 
+   pull(lifeExp) %>% 
+   mean()
[1] 57.78434
like image 888
kputschko Avatar asked Jul 24 '18 15:07

kputschko


1 Answers

It is not. If you check the execution plan (optimizedPlan function as defined here) you'll see it is just a limit:

spark_data %>% sample_n(300) %>% optimizedPlan()
<jobj[168]>
  org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
  GlobalLimit 300
+- LocalLimit 300
   +- InMemoryRelation [country#151, continent#152, year#153, lifeExp#154, pop#155, gdpPercap#156], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `gapminder`
         +- Scan ExistingRDD[country#151,continent#152,year#153,lifeExp#154,pop#155,gdpPercap#156] 

This further confirmed by the show_query:

spark_data %>% sample_n(300) %>% show_query()
<SQL>
SELECT *
FROM (SELECT *
FROM `gapminder` TABLESAMPLE (300 rows) ) `hntcybtgns`

and visualized execution plan:

TABLESAMPLE(n ROWS) plan

Finally if you check Spark source you'll see that this case is implemented with simple LIMIT:

case ctx: SampleByRowsContext =>
  Limit(expression(ctx.expression), query)

I believe that this semantics has been inherited from Hive where equivalent query takes n first rows from each input split.

In practice getting a sample of an exact size is just very expensive, and you should avoid unless strictly necessary (same as large LIMITS).

like image 186
zero323 Avatar answered Oct 18 '22 03:10

zero323