I am using RPostgreSQL to connect to a local database. The setup works just fine on my Linux machine. R 2.11.1, Postgres 8.4.
I was playing with the 'foreach' with the multicore (doMC) parallel backend to wrap some repetitive queries (numbering a few thousand) and appending the results into a data structure. Curiously enough, it works if I use %do% but fails when I switch to %dopar%, with the exception when there is only one iteration (as shown below)
I wondered whether it had something to do with a single connection object, so I created 10 connection objects and depending on what 'i' was, a certain con object was given for that query, depending on i modulo 10. (indicated below by just 2 connection objects). The expression which is evaluated eval(expr.01), contains/is the query which depends on what 'i' is.
I can't make sense of these particular error messages. I am wondering whether there is any way to make this work.
Thanks.
Vishal Belsare
R snippet follows:
> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% {
+ if (i %% 2 == 0) {con <- con0};
+ if (i %% 2 == 1) {con <- con1};
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
> id.qed2.foreach
[[1]]
[1] 411 414 2140 2406 4490 4507 4519 4570 4571 4572 4703 4731
[109] 48765 84312 91797
> id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% {
+ if (i %% 2 == 0) {con <- con0};
+ if (i %% 2 == 1) {con <- con1};
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
Error in stop(paste("expired", class(con))) :
no function to return from, jumping to top level
Error in stop(paste("expired", class(con))) :
no function to return from, jumping to top level
Error in { :
task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
>
EDIT: I changed a few things, (still unsuccessful), but a few things come to light. Connection objects made in the loop and not 'disconnected' via dbDisconnect, lead to hanging connections as evident by the /var/log for Postgres. A few new error messages show up when I do this:
> system.time(
+ id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE,
.packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL");
con0 <- dbConnect(drv0, dbname='nseindia');
list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters);
dbDisconnect(con0)})
Error in postgresqlExecStatement(conn, statement, ...) :
no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) :
no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) :
no function to return from, jumping to top level
Error in { :
task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
It's more efficient to create the database connection once per worker, rather than once per task. Unfortunately, mclapply doesn't provide a mechanism for initializing the workers before executing tasks, so it's not easy to do this using the doMC backend, but if you use the doParallel backend, you can initialize the workers using clusterEvalQ. Here's an example of how to restructure the code:
library(doParallel)
cl <- makePSOCKcluster(detectCores())
registerDoParallel(cl)
clusterEvalQ(cl, {
library(DBI)
library(RPostgreSQL)
drv <- dbDriver("PostgreSQL")
con <- dbConnect(drv, dbname="nsdq")
NULL
})
id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE,
.noexport="con",
.packages=c("DBI", "RPostgreSQL")) %dopar% {
lst <- eval(expr.01) #contains the SQL query which depends on 'i'
qry <- dbSendQuery(con, lst)
tmp <- fetch(qry, n=-1)
dt <- dates.qed2[i]
list(date=dt, idreuters=tmp$idreuters)
}
clusterEvalQ(cl, {
dbDisconnect(con)
})
Since doParallel and clusterEvalQ are using the same cluster object cl
, the foreach loop will have access to the database connection object con
when executing the tasks.
The following works and speeds up by ~ 1.5x over a sequential form. As a next step, I am wondering whether it is possible to attach a connection object to each of the workers spawned by registerDoMC. If so, then there would be no need to create/destroy the connection objects, which prevents from overwhelming the PostgreSQL server with connections.
pgparquery <- function(i) {
drv <- dbDriver("PostgreSQL");
con <- dbConnect(drv, dbname='nsdq');
lst <- eval(expr.01); #contains the SQL query which depends on 'i'
qry <- dbSendQuery(con,lst);
tmp <- fetch(qry,n=-1);
dt <- dates.qed2[i]
dbDisconnect(con);
result <- list(date=dt, idreuters=tmp$idreuters)
return(result)}
id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)}
--
Vishal Belsare
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With