Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to asynchronously query multiple databases in R

I am currently querying multiple databases using a list of stored connections and a for loop but I want to speed up the process by querying all databases asynchronously

I read online about the promises functionality but it didn't work as anticipated.

for(connection in databases)
{
    temp <- data.table(dbGetQuery(connection, "super secret sql query"))
    results <- rbind(results, temp)
    dbDisconnect(connection)
}

results$sum <- as.numeric(results$sum)
return(results)}

I want to change this for loop to a single execution statement to blast out the same query across multiple databases and return the results.

like image 658
davesauto Avatar asked Sep 09 '19 20:09

davesauto


2 Answers

Because future will try to auto-transfer variables into the nodes and will not transfer those with external pointers (including database connection objects), you need to divine a wrapper that does this for you. Here's a suggestion, under-tested but gives you a start.

Update: I think stale connections are best done at the driver-level, so I'm suggesting the use of pool. (If you know how best to know if a connection is not expired without tryCatch(DBI::dbGetQuery(...), error=function(e) "expired"), I'm all ears ... most ODBC drivers and odbc itself have a short-sighted view of what a "valid connection" means in this context.)

cred <- list(drv = odbc::odbc(), server = "server.address", user = "me", password = "secret")
mydb <- function(cred) {
  library(DBI)
  library(odbc)
  library(pool)
  if (exists(".cred") && !is.null(.cred) && !identical(.cred, cred)) {
    if (exists(".pool") && !is.null(.pool)) {
      pool::poolClose(.pool)
      .pool <<- NULL
    }
    .cred <<- NULL
  }
  if (!exists(".pool") || is.null(.pool)) {
    .pool <<- do.call(pool::dbPool, cred)
    .cred <<- cred
  }
  conn <- pool::poolCheckout(.pool)
  # hack to always return the pool object, don't "leak" it
  do.call(on.exit, list(substitute(suppressWarnings(pool::poolReturn(conn)))),
          envir = parent.frame())
  conn
}

It makes a sort-of-sloppy decision to store a viable connection (and its credentials) in each node's global environment in a dot-variable that is intended to not clash with anything else. cred should transfer just fine to the nodes, since it is just a list. mydb(cred) will create a new connection if non-existent, pass the old connection if exists and same credentials, or delete the old connection and create a new if credentials have changed for some reason.

POC:

library(DBI)
library(odbc)
library(pool)
library(future)
library(future.apply) # only required for this demo, future_lapply
cl <- parallel::makeCluster(3)
plan(cluster, workers=cl)
cred <- list(driver = odbc::odbc(), server = "sqlserver.ip.address", user = "me", password = "secret")

DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
#   R_pid
# 1  7500
DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
#   R_pid
# 1  7500

### single future call
a %<-% DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid"))
a
#   R_pid
# 1  9732

### multiple future calls
future_lapply(1:4, function(ign) DBI::dbGetQuery(mydb(cred), paste("select", Sys.getpid(), " as R_pid")))
# [[1]]
#   R_pid
# 1  9732
# [[2]]
#   R_pid
# 1  6132
# [[3]]
#   R_pid
# 1  6132
# [[4]]
#   R_pid
# 1  8480

While the attempt is to not leak database objects, for some reason I still get warnings about leaked objects ... this suggests my fancy on.exit(..., envir=parent.frame()) is not doing everything I hoped it would. I believe the warning is relatively benign, but it does suggest some sloppiness in connection management.

I'm a bit explicit with loading packages, because otherwise I see errors along the form of:

# Error in (function (classes, fdef, mtable)  : 
#   unable to find an inherited method for function 'dbGetQuery' for signature '"Microsoft SQL Server", "character"'

and while I'm testing against mssql and you're using postgresql, I think the problem is indifferent to that. It was remedied by the explicit control of packages and some other nuances in the code.

like image 141
r2evans Avatar answered Sep 20 '22 00:09

r2evans


Unfortunately, it isn't possible to send queries simultaneously to a list of existing RPostgres connections. This is because the RPostgres/DBI methods dbGetQuery and the underlying dbSendQuery both block the R process until the result has been returned, and there is no API to send the query without this blocking.

The accepted answer to this question shows how to share the query sending among a pool of parallel worker processes, and cache connections so that they can be created once per process instead of once per query. But in the OP's situation, in which only one query is run on each database, there is no advantage to this caching, and if your connections have already been established in the main R process, they will each need to be re-created in a worker process.

The problem with using databases across multiple processes

As HenrikB (the author of future package) noted in the comments, it is not possible to "share" database connections between processes in R -- the copies made by the future package (including those copied from a child to a parent process, via returning from the future) do not function.

db_conn <- DBI::dbConnect(...)

f <- future::future({
  dplyr::tbl(db_conn, 'table_name')
})

future::value(f)
# Error: Invalid connection

connection_future <- future::future({
  DBI::dbConnect(...)
})

db_conn <- future::value(connection_future)
dplyr::tbl(db_conn, 'table_name')
# Error: Invalid connection
like image 24
zmbc Avatar answered Sep 22 '22 00:09

zmbc