Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reconnect to PostgreSQL database with R's pool package

I have an API built with R plumber that connects to a PostgreSQL database using RPostgreSQL and pool (although this would also apply if I was using a Shiny app):

# create the connection pool
pool <- dbPool(
  drv = PostgreSQL(),
  host = Sys.getenv("DB_HOST"),
  port = 5432,
  dbname = "db",
  user = Sys.getenv("DB_USER"),
  password = Sys.getenv("DB_PASSWORD")
)

# start the API
pr <- plumb("plumber.R")

# on stop, close the pool
pr$registerHooks(
  list("exit" = function() { poolClose(pool) })
)

I want to import new data every day. The easiest way is to create a new database and promote it to production:

CREATE DATABASE db_new;
-- create the tables
-- bulk-insert the data
SELECT pg_terminate_backend (pid) FROM pg_stat_activity WHERE datname = 'db';
DROP DATABASE db;
ALTER DATABASE db_new RENAME TO db;

This is fast and minimizes downtime. The problem is that pool then loses is connection to the database and doesn't automatically attempt to reconnect:

> tbl(pool, "users")
Error in postgresqlExecStatement(conn, statement, ...) : 
  RS-DBI driver: (could not Retrieve the result : FATAL:  terminating connection due to administrator command
server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
)

Even if I wasn't replacing the database every day, DB servers occasionally restart and that will also cause my app to break. Reconnection doesn't seem to be a feature of pool, RPostgreSQL, nor DBI. Does anyone know a way to deal with this problem?

like image 842
Devin Avatar asked Mar 04 '19 18:03

Devin


People also ask

How does Postgres connection pool work?

Connection pooling is the process of having a pool of active connections on the backend servers. These can be used any time a user sends a request. Instead of opening, maintaining, and closing a connection when a user sends a request, the server will assign an active connection to the user.

What is a pool PostgreSQL?

PostgreSQL connection pool allows you to serve a larger number of client connections to any given Managed Database than normally possible while keeping the server resource usage low.

What is RPostgreSQL?

This package provides a Database Interface 'DBI' compliant driver for 'R' to access 'PostgreSQL' database systems. In order to build and install this package from source, 'PostgreSQL' itself must be present your system to provide 'PostgreSQL' functionality via its libraries and header files.


1 Answers

I recently encountered a similar issue due to MySQL connections being closed when the instance's wait_timeout was exceeded. I came across your post on RStudio Community, and was inspired by your solution. In case you are still making use of it, and are in search of a solution that avoids the extra query while wrapping the actual functions you use, here is a reprex demonstrating something I came up with, along with an example proving it works:

library(dplyr, warn.conflicts = FALSE)
library(pool)
library(RMariaDB)

generate_safe_query <- function(pool) {
  function(db_function, ...) {
    tryCatch({
      db_function(pool, ...)
    }, error = function(e) {
      if (grepl("Lost connection to MySQL server during query", e$message)) {
        # Preserve `validationInterval` so that it can be restored
        validation_interval <- pool$validationInterval
        # Trigger destruction of dead connection
        pool$validationInterval <- 0
        refreshed_connection <- poolCheckout(pool)
        poolReturn(refreshed_connection)
        # Restore original `validationInterval`
        pool$validationInterval <- validation_interval
        # Execute the query with the new connection
        db_function(pool, ...)
      } else {
        # Unexpected error
        stop(e)
      }
    })
  }
}

mysql_pool <- dbPool(MariaDB(),
                     host = "127.0.0.1",
                     username = "root",
                     password = "",
                     dbname = "test")

safe_query <- generate_safe_query(mysql_pool)

# Works
safe_query(tbl, "notes")
#> # Source:   table<notes> [?? x 2]
#> # Database: mysql 8.0.15 [[email protected]:/test]
#>      id note 
#>   <int> <chr>
#> 1     1 NOTE1

# Set the `wait_timeout` to 5 seconds for this session
invisible(safe_query(dbExecute, "SET SESSION wait_timeout = 5"))

# Wait longer than `wait_timeout` to trigger a disconnect
Sys.sleep(6)

# Still works; warning will appear notifying that connection was
# destroyed and replaced with a new one
safe_query(tbl, "notes")
#> Warning: It wasn't possible to activate and/or validate the object. Trying
#> again with a new object.
#> # Source:   table<notes> [?? x 2]
#> # Database: mysql 8.0.15 [[email protected]:/test]
#>      id note 
#>   <int> <chr>
#> 1     1 NOTE1

safe_query(poolClose)
# Or, equivalently: 
# poolClose(mysql_pool)

Created on 2019-05-30 by the reprex package (v0.3.0)

The function returned by generate_safe_query will work with any database query function (e.g. dbExecute, dbGetQuery, etc.). Obviously, you'll want to update the error message it matches to suit your needs.

I have also opened my own Community topic on an option I think should be included in dbPool that would alleviate the need for such workarounds.

like image 169
hugo-pa Avatar answered Oct 16 '22 21:10

hugo-pa