Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a way to use period.apply with doParallel and foreach in xts?

I'd like to parallelize a period.apply function in R, I'm trying to use doParallel with Foreach, but I don't know how i could implement this function. The data I'm using is an xts object with the date time index and the values of a variable, and what I am trying to do is make the mean of the data every 5 seconds:

                                     VAR
2018-01-01 00:00:00                1945.054
2018-01-01 00:00:02                1944.940
2018-01-01 00:00:05                1945.061
2018-01-01 00:00:07                1945.255
2018-01-01 00:00:10                1945.007
2018-01-01 00:00:12                1944.995

Here is a sample of the code I wrote but it doesn't work:

library(xts)
library(doParallel)
library(foreach)

cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)

ends <- endpoints(x,"secs",5)
m <- foreach(i = 1:length(index(x))) %dopar% period.apply(x,ends,mean)
index(m) <- foreach(m) %dopar% trunc(index(m),"secs")
stopCluster()

The code that works is this but for a much bigger database it takes too much time:

ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")

Is there a way of doing this more efficiently?

Thanks in advance.

like image 984
Riverarodrigoa Avatar asked Jan 28 '23 05:01

Riverarodrigoa


2 Answers

Have you tried your code on some simple data set? Because once I got it to run, it did all the work multiple times (once for each line in x). Also, if you try to parallelise work it is usually a good idea to let the 'worker' do as much work as possible before sending the data back. In you code, you have two successive foreach calls which lead to additional communication overhead.

My approach is like this:

  1. Split the xts object into N junks, making sure that we split at one of the 5 second intervals.
  2. Let each worker do all the work for one chunk.
  3. Combine the results. How to choose N?

Since split.xts is used for step one, each chunk will have the same number of 5s intervals. However, the amount of work to be done depends (probably) more on the number of data points than on the number of 5s intervals. So if the distribution of points among these chunks is uneven, it might make sense to use a larger number of chunks together with some load-balancing. If the distribution of points is even, it makes sense to make N as large as possible to minimize the communication overhead. Here I am taking the latter approach, i.e. setting N equal to the number of cores.

Now let's generate some sample data and apply your working solution:

library(xts)
x <- xts(x = runif(100),
         order.by = as.POSIXct("2018-01-01") + 0:99)

ends <- endpoints(x,"secs",5)
m <- period.apply(x, ends, mean)
index(m) <- trunc(index(m),"secs")

Next we set-up the parallel cluster:

library(doParallel)
library(foreach)

cores <- detectCores()
cluster <- makeCluster(cores, type = "PSOCK")
registerDoParallel(cluster)

Now we have to split the xts object. Here I first determine the time span of the entire object and distribute that on N 5s intervals.

N <- cores
k <- as.integer(ceiling(difftime(max(index(x)), min(index(x)), units = "secs") / (5 * N)))

Next I split the xts object into a list of xts objects, each of which having about the same length:

split_x <- split(x, f = "secs", k = 5 * k)

Now I let foreach iterate over these chunks and combine the results:

m2 <- foreach(x = split_x, .packages = c("xts"), .combine = c) %dopar% {
    ends <- endpoints(x,"secs",5)
    m <- period.apply(x, ends, mean)
    index(m) <- trunc(index(m),"secs")
    m
}
stopCluster(cluster)

Hooray, results are equal:

all.equal(m, m2)
#> [1] TRUE
like image 125
Ralf Stubner Avatar answered Feb 26 '23 23:02

Ralf Stubner


I was really depressed by the performance of period.apply() illustrated in this question. My depression became an obsession to make it faster. So I rewrote it in C. Here's an example that uses it and shows the performance improvement.

library(xts)  # need the GitHub development version
period_apply <- xts:::period_apply  # not exported

set.seed(21)
x <- .xts(rnorm(1e7), 1:1e7)
e <- endpoints(x, "seconds", 5)

system.time(y <- period.apply(x, e, sum))  # current version
#    user  system elapsed 
#  77.904   0.368  78.462 
system.time(z <- period_apply(x, e, sum))  # new C version
#    user  system elapsed 
#  15.468   0.232  15.741
all.equal(y, z)
# [1] TRUE

So it's ~5x faster for this example. There are still a few things that could make it even faster, but 5x was a good place to stop and show it could be better. Check out the latest development version if you want (and are brave enough) to try it.

like image 22
Joshua Ulrich Avatar answered Feb 26 '23 23:02

Joshua Ulrich