Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

data.table and parallel computing

Following this post: multicore and data.table in R, I was wondering if there was a way to use all cores when using data.table, typically doing calculations by groups could be parallelized. It seems that plyr allows such operations by design.

like image 433
statquant Avatar asked Feb 07 '13 19:02

statquant


People also ask

What is data parallel computing?

Data parallelism is a way of performing parallel execution of an application on multiple processors. It focuses on distributing data across different nodes in the parallel execution environment and enabling simultaneous sub-computations on these distributed data across the different compute nodes.

What is data parallelism in parallel computing?

Data parallelism refers to scenarios in which the same operation is performed concurrently (that is, in parallel) on elements in a source collection or array. In data parallel operations, the source collection is partitioned so that multiple threads can operate on different segments concurrently.

What is an example of parallel computing?

The Intel® processors that power most modern computers are examples of parallel computing. The Intel Core™ i5 and Core i7 chips in the HP Spectre Folio and HP EliteBook x360 each have 4 processing cores.

How is parallel computing used in data mining?

The parallel and cloud computing platforms are considered a better solution for big data mining. The concept of parallel computing is based on dividing a large problem into smaller ones and each of them is carried out by one single processor individually.


2 Answers

First thing to check is that data.table FAQ 3.1 point 2 has sunk in :

One memory allocation is made for the largest group only, then that memory is reused for the other groups. There is very little garbage to collect.

That's one reason data.table grouping is quick. But this approach doesn't lend itself to parallelization. Parallelizing means copying the data to the other threads, instead, costing time. But, my understanding is that data.table grouping is usually faster than plyr with .parallel on anyway. It depends on the computation time of the task for each group, and if that compute time can be easily reduced or not. Moving the data around often dominates (when benchmarking 1 or 3 runs of large data tasks).

More often, so far, it's actually some gotcha that's biting in the j expression of [.data.table. For example, recently we saw poor performance from data.table grouping but the culprit turned out to be min(POSIXct) (Aggregating in R over 80K unique ID's). Avoiding that gotcha yielded over 50 times speedup.

So the mantra is: Rprof, Rprof, Rprof.

Further, point 1 from the same FAQ might be significant :

Only that column is grouped, the other 19 are ignored because data.table inspects the j expression and realises it doesn’t use the other columns.

So, data.table really doesn't follow the split-apply-combine paradigm at all. It works differently. split-apply-combine lends itself to parallelization but it really doesn't scale to large data.

Also see footnote 3 in the data.table intro vignette :

We wonder how many people are deploying parallel techniques to code that is vector scanning

That's trying to say "sure, parallel is significantly faster, but how long should it really take with an efficient algorithm?".

BUT if you've profiled (using Rprof), and the task per group really is compute intensive, then the 3 posts on datatable-help including the word "multicore" might help:

multicore posts on datatable-help

Of course there are many tasks where parallelization would be nice in data.table, and there is a way to do it. But it hasn't been done yet, since usually other factors bite, so it's been low priority. If you can post reproducible dummy data with benchmarks and Rprof results, that would help increase the priority.

like image 128
Matt Dowle Avatar answered Oct 10 '22 00:10

Matt Dowle


I've done some tests per @matt dowle's prior mantra of Rprof, Rprof, Rprof.

What I find is that the decision to parallelize is context dependent; but is likely significant. Depending on the test operations (eg foo below, which can be customized) and the number of cores utilized (I try both 8 and 24), I get different results.

Below results:

  1. using 8 cores, I see a 21% improvement in this example for parallelization
  2. using 24 cores, I see 14% improvement.

I also look at some real-world (non shareable) data / operations which shows a larger (33% or 25%, two different tests) improvement paralellizing with 24 cores. Edit May 2018 A new set of real-world example cases are showing closer to 85% improvements from parallel operations with 1000 groups.

R> sessionInfo() # 24 core machine: R version 3.3.2 (2016-10-31) Platform: x86_64-pc-linux-gnu (64-bit) Running under: CentOS Linux 7 (Core)  attached base packages: [1] parallel  stats     graphics  grDevices utils     datasets  methods [8] base  other attached packages: [1] microbenchmark_1.4-2.1 stringi_1.1.2          data.table_1.10.4  R> sessionInfo() # 8 core machine: R version 3.3.2 (2016-10-31) Platform: x86_64-apple-darwin13.4.0 (64-bit) Running under: macOS Sierra 10.12.4  attached base packages: [1] parallel  stats     graphics  grDevices utils     datasets  methods   base       other attached packages: [1] microbenchmark_1.4-2.1 stringi_1.1.5          data.table_1.10.4      

Example below:

library(data.table) library(stringi) library(microbenchmark)  set.seed(7623452L) my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)  my_mat <- matrix(rnorm(1e5), ncol= 20) dt <- data.table(grps= rep(my_grps, each= 20), my_mat)  foo <- function(dt) {   dt2 <- dt ## needed for .SD lock   nr <- nrow(dt2)    idx <- sample.int(nr, 1, replace=FALSE)    dt2[idx,][, `:=` (     new_var1= V1 / V2,     new_var2= V4 * V3 / V10,     new_var3= sum(V12),     new_var4= ifelse(V10 > 0, V11 / V13, 1),     new_var5= ifelse(V9 < 0, V8 / V18, 1)   )]     return(dt2[idx,]) }  split_df <- function(d, var) {   base::split(d, get(var, as.environment(d))) }  foo2 <- function(dt) {   dt2 <- split_df(dt, "grps")    require(parallel)   cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))   clusterExport(cl, varlist= "foo")   clusterExport(cl, varlist= "dt2", envir = environment())   clusterEvalQ(cl, library("data.table"))    dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)    parallel::stopCluster(cl)   return(rbindlist(dt2)) }  print(parallel::detectCores()) # 8  microbenchmark(   serial= dt[,foo(.SD), by= "grps"],   parallel= foo2(dt),   times= 10L )  Unit: seconds      expr      min       lq     mean   median       uq      max neval cld    serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387    10   b  parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257    10  a   print(parallel::detectCores()) # 24  Unit: seconds      expr       min        lq     mean   median       uq      max neval cld    serial  9.014247  9.804112 12.17843 13.17508 13.56914 14.13133    10   a  parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353    10   a 

Profiling:

We can use this answer to provide a more direct response to @matt dowle's original comment to profiling.

As a result, we do see that the majority of compute time is handled by base and not data.table. data.table operations themselves are, as expected, exceptionally fast. While some might argue that this is evidence that there is no need for parallelism within data.table, I posit that this workflow/operation-set is not atypical. That is, it is my strong suspicion that the majority of large data.table aggregation involve a substantial amount of non-data.table code; and that this is correlated with interactive use vs development / production use. I therefore conclude that parallelism would be valuable within data.table for large aggregations.

library(profr)  prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),                        simplify = FALSE)  pkg_timing <- fun_timing <- vector("list", length= 100) for (i in 1:100) {   fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)   pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum) }  sort(sapply(fun_timing, sum)) #  no large outliers  fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {   ret <- data.table(fun= names(x), time= x)   ret[, pct_time := time / sum(time)]   return(ret) }))  pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {   ret <- data.table(pkg= names(x), time= x)   ret[, pct_time := time / sum(time)]   return(ret) }))  fun_timing2[, .(total_time= sum(time),                 avg_time= mean(time),                 avg_pct= round(mean(pct_time), 4)), by= "fun"][   order(avg_time, decreasing = TRUE),][1:10,]  pkg_timing2[, .(total_time= sum(time),                 avg_time= mean(time),                 avg_pct= round(mean(pct_time), 4)), by= "pkg"][   order(avg_time, decreasing = TRUE),] 

Results:

                      fun total_time avg_time avg_pct  1:               base::[    670.362  6.70362  0.2694  2:      NA::[.data.table    667.350  6.67350  0.2682  3:       .GlobalEnv::foo    335.784  3.35784  0.1349  4:              base::[[    163.044  1.63044  0.0655  5:   base::[[.data.frame    133.790  1.33790  0.0537  6:            base::%in%    120.512  1.20512  0.0484  7:        base::sys.call     86.846  0.86846  0.0348  8: NA::replace_dot_alias     27.824  0.27824  0.0112  9:           base::which     23.536  0.23536  0.0095 10:          base::sapply     22.080  0.22080  0.0089            pkg total_time avg_time avg_pct 1:       base   1397.770 13.97770  0.7938 2: .GlobalEnv    335.784  3.35784  0.1908 3: data.table     27.262  0.27262  0.0155 

crossposted in github/data.table

like image 26
Alex W Avatar answered Oct 09 '22 23:10

Alex W