Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Changing the number of cores while parallel computing in R

I'm executing parallelized code in R using the parallel package and mclapply which takes a predefined number of cores as a parameter.

If I have a job that is going to run for a couple days, is there a way for me to write (or wrap) my mclapply function to use fewer cores during peak server hours and ramp up the usage in off-peak hours?

like image 879
Megatron Avatar asked Oct 19 '22 00:10

Megatron


1 Answers

I guess the easiest solution would be to split your data into smaller sized chunks and run mclapply separately on those chunks. Then you can set the number of cores for each run of mclapply. This works probably better with calculations that have little variance w.r.t. run-time.

I have created a little quick-and-dirty mock-up of how this could look like:

library(parallel)
library(lubridate)

#you would have to come up with your own function
#for the number of cores to be used
determine_cores=function(hh) {
  #hh will be the hour of the day
  if (hh>17|hh<9) {
    return(4)
  } else {
    return(2)
  }
}

#prepare some sample data
set.seed(1234)
myData=lapply(seq(1e-1,1,1e-1),function(x) rnorm(1e7,0,x))

#calculate SD with mclapply WITHOUT splitting of data into chunks
#we need this for comparison
compRes=mclapply(myData,function(x) sd(x),mc.cores=4)

set.seed(1234)
#this will hold the results of the separate mclapply calls
res=list()
#starting position within myData
chunk_start_pos=1
calc_flag=TRUE

while(calc_flag) {
  #use the function defined above to determine how many cores we may use
  core_num=determine_cores(lubridate::hour(Sys.time()))
  #determine end position of data chunk
  chunk_end_pos=chunk_start_pos+core_num-1
  if (chunk_end_pos>=length(myData)) {
    chunk_end_pos=length(myData)
    calc_flag=FALSE
  }
  message("Calculating elements ",chunk_start_pos," to ",chunk_end_pos)
  #mclapply call on data chunk
  #store data in res
  res[[length(res)+1]]=mclapply(myData[chunk_start_pos:(chunk_start_pos+core_num-1)],
                                function(x) sd(x),
                                mc.preschedule=FALSE,
                                mc.cores=core_num)
  #calculate new start position
  chunk_start_pos=chunk_start_pos+core_num
}

#let's compare the results
all.equal(compRes,unlist(res,recursive=FALSE))
#TRUE
like image 152
cryo111 Avatar answered Oct 21 '22 17:10

cryo111