Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

R - How to replicate rows in a spark dataframe using sparklyr

Is there a way to replicate the rows of a Spark's dataframe using the functions of sparklyr/dplyr?

sc <- spark_connect(master = "spark://####:7077")

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")

This is the desired output, saved into a new spark tbl:

> df2_tbl
   row1  row2
  <int> <chr>
1     1     A
2     1     A
3     1     A
4     2     B
5     2     B
6     2     B
7     3     C
8     3     C
9     3     C
like image 778
Igor Avatar asked Jun 13 '17 20:06

Igor


People also ask

What is the difference between SparkR and Sparklyr?

Sparklyr provides a range of functions that allow you to access the Spark tools for transforming/pre-processing data. SparkR is basically a tool for running R on Spark. In order to use SparkR, we just import it into our environment and run our code.

What is RStudio Sparklyr?

Sparklyr is an R interface for Apache Spark that allows you to: Install and connect to Spark using YARN, Mesos, Livy or Kubernetes. Use dplyr to filter and aggregate Spark datasets and streams then bring them into R for analysis and visualization. Use MLlib, H2O, XGBoost and GraphFrames to train models at scale in ...

Can I use spark with R?

You can connect your R program to a Spark cluster from RStudio, R shell, Rscript or other R IDEs. To start, make sure SPARK_HOME is set in environment (you can check Sys. getenv), load the SparkR package, and call sparkR.


2 Answers

With sparklyr you can use array and explode as suggested by @Oli:

df_tbl %>% 
  mutate(arr = explode(array(1, 1, 1))) %>% 
  select(-arr)

# # Source:   lazy query [?? x 2]
# # Database: spark_connection
#    row1 row2 
#   <int> <chr>
# 1     1 A    
# 2     1 A    
# 3     1 A    
# 4     2 B    
# 5     2 B    
# 6     2 B    
# 7     3 C    
# 8     3 C    
# 9     3 C    

and generalized

library(rlang)

df_tbl %>%  
  mutate(arr = !!rlang::parse_quo(
    paste("explode(array(", paste(rep(1, 3), collapse = ","), "))")
  )) %>% select(-arr)

# # Source:   lazy query [?? x 2]
# # Database: spark_connection
#    row1 row2 
#   <int> <chr>
# 1     1 A    
# 2     1 A    
# 3     1 A    
# 4     2 B    
# 5     2 B    
# 6     2 B    
# 7     3 C    
# 8     3 C    
# 9     3 C   

where you can easily adjust number of rows.

like image 142
Alper t. Turker Avatar answered Sep 23 '22 07:09

Alper t. Turker


The idea that comes to mind first is to use the explode function (it is exactly what it is meant for in Spark). Yet arrays do not seem to be supported in SparkR (to the best of my knowledge).

> structField("a", "array")
Error in checkType(type) : Unsupported type for SparkDataframe: array

I can however propose two other methods:

  1. A straightforward but not very elegant one:

    head(rbind(df, df, df), n=30)
    #    row1 row2
    # 1    1    A
    # 2    2    B
    # 3    3    C
    # 4    1    A
    # 5    2    B
    # 6    3    C
    # 7    1    A
    # 8    2    B
    # 9    3    C
    

    Or with a for loop for more genericity:

    df2 = df
    for(i in 1:2) df2=rbind(df, df2)
    

    Note that this would also work with union.

  2. The second, more elegant method (because it only implies one spark operation) is based on a cross join (Cartesian product) with a dataframe of size 3 (or any other number):

    j <- as.DataFrame(data.frame(s=1:3))
    head(drop(crossJoin(df, j), "s"), n=100)
    #    row1 row2
    # 1    1    A
    # 2    1    A
    # 3    1    A
    # 4    2    B
    # 5    2    B
    # 6    2    B
    # 7    3    C
    # 8    3    C
    # 9    3    C
    
like image 40
Oli Avatar answered Sep 25 '22 07:09

Oli