I'm trying to setup a parallel task where each worker will need to make database queries. I'm trying to setup each worker with a connection as seen in this question but each time I try it returns <Expired PostgreSQLConnection:(2781,0)>
for however many workers I registered.
Here's my code:
cl <- makeCluster(detectCores())
registerDoParallel(cl)
clusterEvalQ(cl, {
library(RPostgreSQL)
drv<-dbDriver("PostgreSQL")
con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
})
If I try to run my foreach
despite the error, it fails with task 1 failed - "expired PostgreSQLConnection"
When I go into the postgres server status it shows all the active sessions that were created.
I don't have any problems interacting with postgres from my main R instance.
If I run
clusterEvalQ(cl, {
library(RPostgreSQL)
drv<-dbDriver("PostgreSQL")
con<-dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
dbGetQuery(con, "select inet_client_port()")
})
then it will return all the client ports. It doesn't give me the expired notice but if I try to run my foreach command it will fail with the same error.
Edit:
I've tried this on Ubuntu and 2 windows computers, they all give the same error.
Another Edit:
Now 3 windows computers
I was able to reproduce your problem locally. I am not entirely sure but I think the problem is related to the way clusterEvalQ
works internally. For example, you say that dbGetQuery(con, "select inet_client_port())
gave you the client port output. If the query was actually evaluated/executed on the cluster nodes then you would be unable to see this output (the same way that you are unable to directly read any other output or print statements that are executed on the external clusternodes).
Hence, It is my understanding that the evaluation is somehow first performed on the local environment and the relevant functions and variables are subsequently copied/exported to the individual clusternodes. This would work for any other type of functions/variables but obviously not for db connections. If the connections/portmappings are linked to the master R instance, then the connections would not work from the slave instances. You would also get the exact same error if you tried to use the clusterExport
function in order to export connections that are created on the master instance.
As an alternative, what you can do is create separate connections inside the individual foreach
tasks. I have verified with a local database that the following works:
library(doParallel)
nrCores = detectCores()
cl <- makeCluster(nrCores)
registerDoParallel(cl)
clusterEvalQ(cl,library(RPostgreSQL))
clusterEvalQ(cl,library(DBI))
result <- foreach(i=1:nrCores) %dopar%
{
drv <- dbDriver("PostgreSQL")
con <- dbConnect(drv, user="user", password="password", dbname="ISO",host="localhost")
queryResult <- dbGetQuery(con, "fetch something...")
dbDisconnect(con)
return(queryResult)
}
stopCluster(cl)
However, now you have to take into account that you will create and disconnect a new connection every foreach
iteration. You might incur some performance overhead because of this. You can obviously circumvent this by splitting up your queries/data intelligently so that a lot of work gets done during the same iteration. Ideally, you should split up the work in exactly as much number of cores that you have available.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With