Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

foreach %dopar% + RPostgreSQL

I am using RPostgreSQL to connect to a local database. The setup works just fine on my Linux machine. R 2.11.1, Postgres 8.4.

I was playing with the 'foreach' with the multicore (doMC) parallel backend to wrap some repetitive queries (numbering a few thousand) and appending the results into a data structure. Curiously enough, it works if I use %do% but fails when I switch to %dopar%, with the exception when there is only one iteration (as shown below)

I wondered whether it had something to do with a single connection object, so I created 10 connection objects and depending on what 'i' was, a certain con object was given for that query, depending on i modulo 10. (indicated below by just 2 connection objects). The expression which is evaluated eval(expr.01), contains/is the query which depends on what 'i' is.

I can't make sense of these particular error messages. I am wondering whether there is any way to make this work.

Thanks.
Vishal Belsare

R snippet follows:

> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
> id.qed2.foreach
[[1]]
  [1]   411   414  2140  2406  4490  4507  4519  4570  4571  4572  4703  4731
[109] 48765 84312 91797

> id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
> 

EDIT: I changed a few things, (still unsuccessful), but a few things come to light. Connection objects made in the loop and not 'disconnected' via dbDisconnect, lead to hanging connections as evident by the /var/log for Postgres. A few new error messages show up when I do this:

> system.time(
+ id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE, 
.packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL"); 
con0 <- dbConnect(drv0, dbname='nseindia');
list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters);
dbDisconnect(con0)})
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
like image 459
Vishal Belsare Avatar asked Oct 11 '10 00:10

Vishal Belsare


2 Answers

It's more efficient to create the database connection once per worker, rather than once per task. Unfortunately, mclapply doesn't provide a mechanism for initializing the workers before executing tasks, so it's not easy to do this using the doMC backend, but if you use the doParallel backend, you can initialize the workers using clusterEvalQ. Here's an example of how to restructure the code:

library(doParallel)
cl <- makePSOCKcluster(detectCores())
registerDoParallel(cl)

clusterEvalQ(cl, {
  library(DBI)
  library(RPostgreSQL)
  drv <- dbDriver("PostgreSQL")
  con <- dbConnect(drv, dbname="nsdq")
  NULL
})

id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE,
                          .noexport="con",
                          .packages=c("DBI", "RPostgreSQL")) %dopar% {
  lst <- eval(expr.01)  #contains the SQL query which depends on 'i'
  qry <- dbSendQuery(con, lst)
  tmp <- fetch(qry, n=-1)
  dt <- dates.qed2[i]
  list(date=dt, idreuters=tmp$idreuters)
}

clusterEvalQ(cl, {
  dbDisconnect(con)
})

Since doParallel and clusterEvalQ are using the same cluster object cl, the foreach loop will have access to the database connection object con when executing the tasks.

like image 129
Steve Weston Avatar answered Nov 05 '22 03:11

Steve Weston


The following works and speeds up by ~ 1.5x over a sequential form. As a next step, I am wondering whether it is possible to attach a connection object to each of the workers spawned by registerDoMC. If so, then there would be no need to create/destroy the connection objects, which prevents from overwhelming the PostgreSQL server with connections.

pgparquery <- function(i) {
drv <- dbDriver("PostgreSQL"); 
con <- dbConnect(drv, dbname='nsdq'); 
lst <- eval(expr.01); #contains the SQL query which depends on 'i'
qry <- dbSendQuery(con,lst);
tmp <- fetch(qry,n=-1);
dt <- dates.qed2[i]
dbDisconnect(con);
result <- list(date=dt, idreuters=tmp$idreuters)
return(result)}

id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)}

--
Vishal Belsare

like image 36
Vishal Belsare Avatar answered Nov 05 '22 02:11

Vishal Belsare