I have an input csv file with 4500 rows. Each row has a unique ID and for each row, I have to read some data, do some calculation, and write the output in a csv file so that I have 4500 csv files written in my output directory. An individual output csv file contains a single row of data with 8 columns
Since I have to perform the same calculation on each row of my input csv, I thought I can parallelise this task using foreach
. Following is the overall structure of the logic
library(doSNOW)
library(foreach)
library(data.table)
input_csv <- fread('inputFile.csv'))
# to track the progres of the loop
iterations <- nrow(input_csv)
pb <- txtProgressBar(max = iterations, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
myClusters <- makeCluster(6)
registerDoSNOW(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.options.snow = opts) %dopar%
{
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'.csv')))
return(temp_result)
}
The above code works fine but always get stuck/inactive/does not do anything after finishing 25% or 30% of the rows in input_csv
. I keep looking at my output directory that after N% of iterations, no file is being written. I suspect if the foreach loop goes into some sleep mode? What I find more confounding is that if I kill the job, re-run the above code, it does say 16% or 30% and then goes inactive again i.e. with each fresh run, it "sleeps" at different progress level.
I can't figure out how to give a minimal reproducible example in this case but thought if anyone knows of any checklist I should go through or potential issues that is causing this would be really helpful. Thanks
EDIT I am still struggling with this issue. If there is any more information I can provide, please let me know.
EDIT2
My original inputFile
contains 213164 rows. So I split my the big file
into 46 smaller files so that each file has 4634 rows
library(foreach)
library(data.table)
library(doParallel)
myLs <- split(mydat, (as.numeric(rownames(mydat))-1) %/% 46))
Then I did this:
for(pr in 1:46){
input_csv <- myLs[[pr]]
myClusters <- parallel::makeCluster(6)
doParallel::registerDoParallel(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.verbose = TRUE) %dopar%
{
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'_',pr,'.csv')))
gc()
}
parallel::stopCluster(myClusters)
gc()
}
This too works till say pr = 7 or pr = 8 iteration and then does not proceed and also does not generate any error message. I am so confused.
EDIT this is what my CPU usage looks like. I only used 4 cores to generate this image. Will anyone be able to explain if there's anything in this image that might address my question.
You could use the progressr package to follow-up memory usage interactively.
For example with furrr
package :
library(furrr)
library(pryr)
plan(multisession,workers=6)
library(progressr)
handlers("progress")
#input_csv <- fread('inputFile.csv'))
#filesID <- as.list(1:nrow(input_csv))
filesID <- as.list(1:12)
with_progress({
p <- progressor(along = filesID)
result <- future_map(filesID, function(fileID) {
#rowRef <- input_csv[fileID, ]
# read data for the unique location in `rowRef`
#weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations : simulate memory increase
temp_result <- rnorm(2e7)
# save the results as csv
#fwrite(temp_result, file.path(paste0('output_iter_',fileID,'.csv')))
Sys.sleep(2)
p(sprintf("memory used=%g", pryr::mem_used()))
return(object.size(temp_result))
},.options=future_options(packages=c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr")))
})
[====================================================>-------] 90% memory used=6.75075e+08
The same method applies to foreach.
Another suggestion is not to return the results to the main process as you already store them in a file. Instead of return(temp_result)
you could output a summary, for example object.size
knowing that the complete results can be found in the associated file.
From your code it is not entirely possible to see why it should stall. Maybe some parts of your foreach
loop is not thread safe (data.table
uses multible threads for subsetting for example)?
As it stands there's very little to change to help out, and @Waldi's answer is likely good at diagnosing the actual problem. The only thing that seems obvious to change here, is to avoid iterating over single rows of your data.frame
by utilizing the under-the-hood functionality of foreach
.
The way foreach
performs parallel programming is by creating an iterator over the object. For parallel programming there will be some overhead between each iteration, as the thread/core will need to request new information. As such it is beneficial to minimize this overhead time, by minimizing the number of iterations. We can do this by splitting our dataset up into chunks or manually creating an iterator through the iterators
package.
I don't have access to your data, so below is a reproducible example using the mtcars
dataset. I've split it into a setup and foreach block for easier readability. Note that files
in my example is a simple vector, so requires some minimal alteration for the actual code shown in the question as files
within the foreach
loop now becomes a data.frame
rather than a vector.
library(iterators)
library(foreach)
library(data.table)
library(arrow)
library(doParallel)
# Set up reproducible example:
data(mtcars)
files <- replicate(100, tempfile())
lapply(files, function(x)write_parquet(mtcars, x))
# Split the files into chunks for the iterator
nc <- parallel::detectCores()
sfiles <- split(files, seq_len(length(files)) %% nc + 1)
# Set up backend
th <- parallel::makeCluster(nc)
registerDoParallel(th)
foreach(files = sfiles, #Note the iterator will name each chunk 'files' within the loop.
.packages = c('data.table', 'arrow', 'dplyr'),
.combine = c, # Because I return the resulting file names
.multicombine = TRUE) %dopar% {
# Iterate over each chunk within foreach
# Reduces loop overhead
outF <- character(length(files))
for(i in seq_along(files)){
tib <- arrow::read_parquet(files[i])
# Do some stuff
tib <- tib %>% select(mpg, hp)
# Save output
outF[i] <- tempfile(fileext = '.csv')
fwrite(tib, outF[i])
}
# Return list of output files
return(outF)
}
Now I don't believe this will fix the issue, but it is something that can reduce your overhead slightly.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With