I'm new to Spark, SparkR and generally all HDFS-related technologies. I've installed recently Spark 1.5.0 and run some simple code with SparkR:
Sys.setenv(SPARK_HOME="/private/tmp/spark-1.5.0-bin-hadoop2.6")
.libPaths("/private/tmp/spark-1.5.0-bin-hadoop2.6/R/lib")
require('SparkR')
require('data.table')
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
hiveContext <- sparkRHive.init(sc)
n = 1000
x = data.table(id = 1:n, val = rnorm(n))
Sys.time()
xs <- createDataFrame(sqlContext, x)
Sys.time()
The code executes immediately. However when I change it to n = 1000000
it takes about 4 minutes (time between two Sys.time()
calls). When I check these jobs in console on port :4040, job for n = 1000
has duration 0.2s, and job for n = 1000000
0.3s. Am I doing something wrong?
You can connect your R program to a Spark cluster from RStudio, R shell, Rscript or other R IDEs. To start, make sure SPARK_HOME is set in environment (you can check Sys. getenv), load the SparkR package, and call sparkR.
Spark has 3 interactive shells: Scala, Python, and R. The Scala shell (bin/spark-shell) and Python shells (bin/pyspark) are installed by default.
Sparklyr is an R package that lets you analyze data in Spark while using familiar tools in R. Sparklyr supports a complete backend for dplyr, a popular tool for working with data frame objects both in memory and out of memory. You can use dplyr to translate R code into Spark SQL.
You're not doing anything particularly wrong. It is just an effect of a combination of different factors:
createDataFrame
as it is currently (Spark 1.5.1) implemented is slow. It is a known issue described in SPARK-8277.data.table
.Until SPARK-8277 is resolved there is not much you can do but there two options you can try:
use plain old data.frame
instead of data.table
. Using flights dataset (227496 rows, 14 columns):
df <- read.csv("flights.csv")
microbenchmark::microbenchmark(createDataFrame(sqlContext, df), times=3)
## Unit: seconds
## expr min lq mean median
## createDataFrame(sqlContext, df) 96.41565 97.19515 99.08441 97.97465
## uq max neval
## 100.4188 102.8629 3
compared to data.table
dt <- data.table::fread("flights.csv")
microbenchmark::microbenchmark(createDataFrame(sqlContext, dt), times=3)
## Unit: seconds
## expr min lq mean median
## createDataFrame(sqlContext, dt) 378.8534 379.4482 381.2061 380.043
## uq max neval
## 382.3825 384.722 3
Write to disk and use spark-csv
to load data directly to Spark DataFrame without direct interaction with R. As crazy as it sounds:
dt <- data.table::fread("flights.csv")
write_and_read <- function() {
write.csv(dt, tempfile(), row.names=FALSE)
read.df(sqlContext, "flights.csv",
source = "com.databricks.spark.csv",
header = "true",
inferSchema = "true"
)
}
## Unit: seconds
## expr min lq mean median
## write_and_read() 2.924142 2.959085 2.983008 2.994027
## uq max neval
## 3.01244 3.030854 3
I am not really sure if really it makes sense to push data that can be handled in R to Spark in the first place but lets not dwell on that.
Edit:
This issue should be resolved by SPARK-11086 in Spark 1.6.0.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With