I am loading some data into sparkR (Spark version 1.4.0, running on fedora21) over which I run some algorithm which produces three different numbers. My algorithm takes a bunch of parameters and I want to run over different parameter settings on the same data. The output format should be a dataframe (or a csv list) whose columns are the algorithm parameters and the three numbers my algo computes, i.e.
mypar1, mypar2, mypar3, myres1, myres2, myres3
1 1.5 1.2 5.6 8.212 5.9
2 1.8 1.7 5.1 7.78 8.34
would be the output for two different parameter settings. I wrote the script below which parallelises the running over different paramater settings: it takes an input file with parameter values as argument, which for the above example would look like this:
1,1.5,1.2
2,1.8,1.7
so one parameter combination per line.
Here's my problem : Instead of getting one per parameter setting, all the numbers are combined into one long list. The function cv_spark returns a data.frame (basically one row). How can I tell spark to combine the output of cv_spark into a dataframe (i.e. do something like rbind?) or list of list?
#!/home/myname/Spark/spark-1.4.0/bin/sparkR
library(SparkR)
sparkcontext <- sparkR.init("local[3]","cvspark",sparkEnvir=list(spark.executor.memory="1g"))
cv_spark <- function(indata) {
cv_params <- strsplit(indata, split=",")[[1]]
param.par1 = as.integer(cv_params[1])
param.par2 = as.numeric(cv_params[2])
param.par3 = as.numeric(cv_params[3])
predictions <- rep(NA, 1)
## here I run some calculation on some data that I load to my SparkR session,
## but for illustration purpose I'm just filling up with some random numbers
mypred = base:::sample(seq(5,10,by=0.01),3)
predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3])
return(as.data.frame(predictions))
}
args <- commandArgs(trailingOnly=TRUE)
print(paste("args ", args))
cvpar = readLines(args[[1]])
rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=4)
myerr <- SparkR:::flatMap(rdd,cv_spark)
output <- SparkR:::collect(myerr)
print("final output")
print(output)
outfile = "spark_output.csv"
write.csv(output,outfile,quote=FALSE,row.names=FALSE)
I managed to get what I wanted by using flatMapValues instead of flatMap, and by creating (key, value) pairs of my various parameter settings (basically key is the line number in my parameter input file and value is the parameters on that line). Then I call reduceByKey which essentially holds one row per key. The modified script looks like this :
#!/home/myname/Spark/spark-1.4.0/bin/sparkR
library(SparkR)
sparkcontext <- sparkR.init("local[4]","cvspark",sparkEnvir=list(spark.executor.memory="1g"))
cv_spark <- function(indata) {
cv_params <- unlist(strsplit(indata[[1]], split=","))
param.par1 = as.integer(cv_params[1])
param.par2 = as.numeric(cv_params[2])
param.par3 = as.integer(cv_params[3])
predictions <- rep(NA, 1)
## here I run some calculation on some data that I load to my SparkR session,
## but for illustration purpose I'm just filling up with some random numbers
mypred = base:::sample(seq(5,10,by=0.01),3)
predictions <- cbind(param.par1, param.par2, param.par3,mypred[1],mypred[2],mypred[3])
return(as.data.frame(predictions))
}
args <- commandArgs(trailingOnly=TRUE)
print(paste("args ", args))
cvpar = readLines(args[[1]])
## Creates (key, value) pairs
cvpar <- Map(list,seq(1,length(cvpar)),cvpar)
rdd <- SparkR:::parallelize(sparkcontext, coll=cvpar, numSlices=1)
myerr <- SparkR:::flatMapValues(rdd,cv_spark)
myerr <- SparkR:::reduceByKey(myerr,"c", 2L)
output <- SparkR:::collect(myerr)
myres <- sapply(output,`[`,2)
df_res <- do.call("rbind",myres)
colnames(df_res) <- c("Element","sigdf","sigq","err","err.sse","err.mse")
outfile = "spark_output.csv"
write.csv(df_res,outfile,quote=FALSE,row.names=FALSE)
This works as expected, i.e. the output is a dataframe (or csv file) with same number of rows as in the input file to the above script (i.e. the number of different parameter values configurations), but perhaps there is a more efficient way to do this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With