Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

foreach loop becomes inactive for large iterations in R

I have an input csv file with 4500 rows. Each row has a unique ID and for each row, I have to read some data, do some calculation, and write the output in a csv file so that I have 4500 csv files written in my output directory. An individual output csv file contains a single row of data with 8 columns Since I have to perform the same calculation on each row of my input csv, I thought I can parallelise this task using foreach. Following is the overall structure of the logic

 library(doSNOW)
 library(foreach)
 library(data.table)
  
 input_csv <- fread('inputFile.csv')) 

 # to track the progres of the loop
 iterations <- nrow(input_csv)
 pb <- txtProgressBar(max = iterations, style = 3)
 progress <- function(n) setTxtProgressBar(pb, n)
 opts <- list(progress = progress)

 myClusters <- makeCluster(6)
 registerDoSNOW(myClusters)

 results <- 

     foreach(i = 1:nrow(input_csv), 
     .packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
     .errorhandling = 'remove',
     .options.snow = opts) %dopar% 
      
  {
        
       rowRef <- input_csv[i, ]
        
       # read data for the unique location in `rowRef`  
         weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))

       # do some calculations
        
       # save the results as csv
        fwrite(temp_result, file.path(paste0('output_iter_',i,'.csv')))
        
       return(temp_result)
 }
  

The above code works fine but always get stuck/inactive/does not do anything after finishing 25% or 30% of the rows in input_csv. I keep looking at my output directory that after N% of iterations, no file is being written. I suspect if the foreach loop goes into some sleep mode? What I find more confounding is that if I kill the job, re-run the above code, it does say 16% or 30% and then goes inactive again i.e. with each fresh run, it "sleeps" at different progress level.

I can't figure out how to give a minimal reproducible example in this case but thought if anyone knows of any checklist I should go through or potential issues that is causing this would be really helpful. Thanks

EDIT I am still struggling with this issue. If there is any more information I can provide, please let me know.

EDIT2
My original inputFile contains 213164 rows. So I split my the big file into 46 smaller files so that each file has 4634 rows

 library(foreach)
 library(data.table)
 library(doParallel)

myLs <- split(mydat, (as.numeric(rownames(mydat))-1) %/% 46))
 

Then I did this:

for(pr in 1:46){

    input_csv <- myLs[[pr]]

    myClusters <- parallel::makeCluster(6)
    doParallel::registerDoParallel(myClusters)


 results <- 

  foreach(i = 1:nrow(input_csv), 
 .packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
 .errorhandling = 'remove',
 .verbose = TRUE) %dopar% 

 {

   rowRef <- input_csv[i, ]

   # read data for the unique location in `rowRef`  
     weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))

   # do some calculations

   # save the results as csv
    fwrite(temp_result, file.path(paste0('output_iter_',i,'_',pr,'.csv')))
    gc()
 }

 parallel::stopCluster(myClusters)
 gc()
 }

This too works till say pr = 7 or pr = 8 iteration and then does not proceed and also does not generate any error message. I am so confused.

EDIT this is what my CPU usage looks like. I only used 4 cores to generate this image. Will anyone be able to explain if there's anything in this image that might address my question.

enter image description here

like image 241
89_Simple Avatar asked Dec 23 '22 17:12

89_Simple


2 Answers

You could use the progressr package to follow-up memory usage interactively.
For example with furrr package :

library(furrr)
library(pryr)
plan(multisession,workers=6)

library(progressr)
handlers("progress")

#input_csv <- fread('inputFile.csv')) 
#filesID <- as.list(1:nrow(input_csv))
filesID <- as.list(1:12)

with_progress({
  p <- progressor(along = filesID)
  result <- future_map(filesID, function(fileID) {
    #rowRef <- input_csv[fileID, ]
    
    # read data for the unique location in `rowRef`  
    #weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))
  
  # do some calculations : simulate memory increase
  temp_result <- rnorm(2e7)
  # save the results as csv
  #fwrite(temp_result, file.path(paste0('output_iter_',fileID,'.csv')))
  
  Sys.sleep(2)
  p(sprintf("memory used=%g", pryr::mem_used()))
  
  
  return(object.size(temp_result))
  },.options=future_options(packages=c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr")))
})

[====================================================>-------]  90% memory used=6.75075e+08

The same method applies to foreach.

Another suggestion is not to return the results to the main process as you already store them in a file. Instead of return(temp_result) you could output a summary, for example object.size knowing that the complete results can be found in the associated file.

like image 161
Waldi Avatar answered Jan 15 '23 19:01

Waldi


From your code it is not entirely possible to see why it should stall. Maybe some parts of your foreach loop is not thread safe (data.table uses multible threads for subsetting for example)?

As it stands there's very little to change to help out, and @Waldi's answer is likely good at diagnosing the actual problem. The only thing that seems obvious to change here, is to avoid iterating over single rows of your data.frame by utilizing the under-the-hood functionality of foreach.

The way foreach performs parallel programming is by creating an iterator over the object. For parallel programming there will be some overhead between each iteration, as the thread/core will need to request new information. As such it is beneficial to minimize this overhead time, by minimizing the number of iterations. We can do this by splitting our dataset up into chunks or manually creating an iterator through the iterators package.
I don't have access to your data, so below is a reproducible example using the mtcars dataset. I've split it into a setup and foreach block for easier readability. Note that files in my example is a simple vector, so requires some minimal alteration for the actual code shown in the question as files within the foreach loop now becomes a data.frame rather than a vector.

Setup

library(iterators)
library(foreach)
library(data.table)
library(arrow)
library(doParallel)
# Set up reproducible example:
data(mtcars)
files <- replicate(100, tempfile())
lapply(files, function(x)write_parquet(mtcars, x))

# Split the files into chunks for the iterator
nc <- parallel::detectCores()
sfiles <- split(files, seq_len(length(files)) %% nc + 1)
# Set up backend
th <- parallel::makeCluster(nc)
registerDoParallel(th)

Foreach

foreach(files = sfiles, #Note the iterator will name each chunk 'files' within the loop. 
        .packages = c('data.table', 'arrow', 'dplyr'), 
        .combine = c, # Because I return the resulting file names
        .multicombine = TRUE) %dopar% {
  # Iterate over each chunk within foreach
  # Reduces loop overhead
  outF <- character(length(files))
  for(i in seq_along(files)){
    tib <- arrow::read_parquet(files[i])
    # Do some stuff
    tib <- tib %>% select(mpg, hp)
    # Save output
    outF[i] <- tempfile(fileext = '.csv')
    fwrite(tib, outF[i])
  }
  # Return list of output files
  return(outF)
}

Now I don't believe this will fix the issue, but it is something that can reduce your overhead slightly.

like image 34
Oliver Avatar answered Jan 15 '23 20:01

Oliver