Is there a way to replicate the rows of a Spark's dataframe using the functions of sparklyr/dplyr?
sc <- spark_connect(master = "spark://####:7077")
df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")
This is the desired output, saved into a new spark tbl:
> df2_tbl
row1 row2
<int> <chr>
1 1 A
2 1 A
3 1 A
4 2 B
5 2 B
6 2 B
7 3 C
8 3 C
9 3 C
Sparklyr provides a range of functions that allow you to access the Spark tools for transforming/pre-processing data. SparkR is basically a tool for running R on Spark. In order to use SparkR, we just import it into our environment and run our code.
Sparklyr is an R interface for Apache Spark that allows you to: Install and connect to Spark using YARN, Mesos, Livy or Kubernetes. Use dplyr to filter and aggregate Spark datasets and streams then bring them into R for analysis and visualization. Use MLlib, H2O, XGBoost and GraphFrames to train models at scale in ...
You can connect your R program to a Spark cluster from RStudio, R shell, Rscript or other R IDEs. To start, make sure SPARK_HOME is set in environment (you can check Sys. getenv), load the SparkR package, and call sparkR.
With sparklyr
you can use array
and explode
as suggested by @Oli:
df_tbl %>%
mutate(arr = explode(array(1, 1, 1))) %>%
select(-arr)
# # Source: lazy query [?? x 2]
# # Database: spark_connection
# row1 row2
# <int> <chr>
# 1 1 A
# 2 1 A
# 3 1 A
# 4 2 B
# 5 2 B
# 6 2 B
# 7 3 C
# 8 3 C
# 9 3 C
and generalized
library(rlang)
df_tbl %>%
mutate(arr = !!rlang::parse_quo(
paste("explode(array(", paste(rep(1, 3), collapse = ","), "))")
)) %>% select(-arr)
# # Source: lazy query [?? x 2]
# # Database: spark_connection
# row1 row2
# <int> <chr>
# 1 1 A
# 2 1 A
# 3 1 A
# 4 2 B
# 5 2 B
# 6 2 B
# 7 3 C
# 8 3 C
# 9 3 C
where you can easily adjust number of rows.
The idea that comes to mind first is to use the explode
function (it is exactly what it is meant for in Spark). Yet arrays do not seem to be supported in SparkR (to the best of my knowledge).
> structField("a", "array")
Error in checkType(type) : Unsupported type for SparkDataframe: array
I can however propose two other methods:
A straightforward but not very elegant one:
head(rbind(df, df, df), n=30)
# row1 row2
# 1 1 A
# 2 2 B
# 3 3 C
# 4 1 A
# 5 2 B
# 6 3 C
# 7 1 A
# 8 2 B
# 9 3 C
Or with a for loop for more genericity:
df2 = df
for(i in 1:2) df2=rbind(df, df2)
Note that this would also work with union
.
The second, more elegant method (because it only implies one spark operation) is based on a cross join (Cartesian product) with a dataframe of size 3 (or any other number):
j <- as.DataFrame(data.frame(s=1:3))
head(drop(crossJoin(df, j), "s"), n=100)
# row1 row2
# 1 1 A
# 2 1 A
# 3 1 A
# 4 2 B
# 5 2 B
# 6 2 B
# 7 3 C
# 8 3 C
# 9 3 C
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With