I am currently querying multiple databases using a list of stored connections and a for loop but I want to speed up the process by querying all databases asynchronously
I read online about the promises functionality but it didn't work as anticipated.
for(connection in databases)
{
temp <- data.table(dbGetQuery(connection, "super secret sql query"))
results <- rbind(results, temp)
dbDisconnect(connection)
}
results$sum <- as.numeric(results$sum)
return(results)}
I want to change this for loop to a single execution statement to blast out the same query across multiple databases and return the results.
Because future
will try to auto-transfer variables into the nodes and will not transfer those with external pointers (including database connection objects), you need to divine a wrapper that does this for you. Here's a suggestion, under-tested but gives you a start.
Update: I think stale connections are best done at the driver-level, so I'm suggesting the use of pool
. (If you know how best to know if a connection is not expired without tryCatch(DBI::dbGetQuery(...), error=function(e) "expired")
, I'm all ears ... most ODBC drivers and odbc
itself have a short-sighted view of what a "valid connection" means in this context.)
cred <- list(drv = odbc::odbc(), server = "server.address", user = "me", password = "secret")
mydb <- function(cred) {
library(DBI)
library(odbc)
library(pool)
if (exists(".cred") && !is.null(.cred) && !identical(.cred, cred)) {
if (exists(".pool") && !is.null(.pool)) {
pool::poolClose(.pool)
.pool <<- NULL
}
.cred <<- NULL
}
if (!exists(".pool") || is.null(.pool)) {
.pool <<- do.call(pool::dbPool, cred)
.cred <<- cred
}
conn <- pool::poolCheckout(.pool)
# hack to always return the pool object, don't "leak" it
do.call(on.exit, list(substitute(suppressWarnings(pool::poolReturn(conn)))),
envir = parent.frame())
conn
}
It makes a sort-of-sloppy decision to store a viable connection (and its credentials) in each node's global environment in a dot-variable that is intended to not clash with anything else. cred
should transfer just fine to the nodes, since it is just a list
. mydb(cred)
will create a new connection if non-existent, pass the old connection if exists and same credentials, or delete the old connection and create a new if credentials have changed for some reason.
POC:
library(DBI)
library(odbc)
library(pool)
library(future)
library(future.apply) # only required for this demo, future_lapply
cl <- parallel::makeCluster(3)
plan(cluster, workers=cl)
cred <- list(driver = odbc::odbc(), server = "sqlserver.ip.address", user = "me", password = "secret")
DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
# R_pid
# 1 7500
DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
# R_pid
# 1 7500
### single future call
a %<-% DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
a
# R_pid
# 1 9732
### multiple future calls
future_lapply(1:4, function(ign) DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid")))
# [[1]]
# R_pid
# 1 9732
# [[2]]
# R_pid
# 1 6132
# [[3]]
# R_pid
# 1 6132
# [[4]]
# R_pid
# 1 8480
While the attempt is to not leak database objects, for some reason I still get warnings about leaked objects ... this suggests my fancy on.exit(..., envir=parent.frame())
is not doing everything I hoped it would. I believe the warning is relatively benign, but it does suggest some sloppiness in connection management.
I'm a bit explicit with loading packages, because otherwise I see errors along the form of:
# Error in (function (classes, fdef, mtable) :
# unable to find an inherited method for function 'dbGetQuery' for signature '"Microsoft SQL Server", "character"'
and while I'm testing against mssql and you're using postgresql, I think the problem is indifferent to that. It was remedied by the explicit control of packages and some other nuances in the code.
Unfortunately, it isn't possible to send queries simultaneously to a list of existing RPostgres connections. This is because the RPostgres/DBI methods dbGetQuery
and the underlying dbSendQuery
both block the R process until the result has been returned, and there is no API to send the query without this blocking.
The accepted answer to this question shows how to share the query sending among a pool of parallel worker processes, and cache connections so that they can be created once per process instead of once per query. But in the OP's situation, in which only one query is run on each database, there is no advantage to this caching, and if your connections have already been established in the main R process, they will each need to be re-created in a worker process.
As HenrikB (the author of future package) noted in the comments, it is not possible to "share" database connections between processes in R -- the copies made by the future package (including those copied from a child to a parent process, via returning from the future) do not function.
db_conn <- DBI::dbConnect(...)
f <- future::future({
dplyr::tbl(db_conn, 'table_name')
})
future::value(f)
# Error: Invalid connection
connection_future <- future::future({
DBI::dbConnect(...)
})
db_conn <- future::value(connection_future)
dplyr::tbl(db_conn, 'table_name')
# Error: Invalid connection
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With