Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Listen to PostgreSQL NOTIFY events with Slick

Can I use Slick / Play Framework (Scala) to listen to PostgreSQL NOTIFY statements?

I want to do something similar to this:

http://bjorngylling.com/2011-04-13/postgres-listen-notify-with-node-js.html

like image 804
glarkou Avatar asked Nov 06 '17 08:11

glarkou


1 Answers

I don't think Slick supports PostgreSQL's NOTIFY, but the postgresql-async library does. One can use the latter to create an Akka Streams Source and incorporate it in a Play endpoint that streams the database notifications to a client:

package controllers

import javax.inject.{Inject, Singleton}

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser
import com.github.mauricio.async.db.util.ExecutorServiceUtils.CachedExecutionContext

import play.api.Logger
import play.api.http.ContentTypes
import play.api.libs.EventSource
import play.api.mvc._

import scala.concurrent.duration._
import scala.concurrent.Await

@Singleton
class DbNotificationController @Inject()(cc: ControllerComponents,
                                         materializer: Materializer)
  extends AbstractController(cc) {

  implicit val mat = materializer

  val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
  val connection = new PostgreSQLConnection(configuration)
  Await.result(connection.connect, 5 seconds)

  val (actor, dbSource) =
    Source.actorRef[String](Int.MaxValue, OverflowStrategy.dropNew)
          .toMat(BroadcastHub.sink[String])(Keep.both)
          .run()

  connection.sendQuery("LISTEN my_channel")
  connection.registerNotifyListener { message =>
    val msg = message.payload
    Logger.debug(s"Sending the payload: $msg")
    actor ! msg
  }

  def index() = Action {
    Ok(views.html.scaladbnotification())
  }

  def streamDb() = Action {
    Ok.chunked(dbSource via EventSource.flow).as(ContentTypes.EVENT_STREAM)
  }
}

In the above controller, when the listener receives a notification from the database, the payload (a String) in the notification is logged and sent to an actor. The messages that are sent to this actor feed the Source that is used in the streamDb endpoint. Before the payload messages are sent to the client, they're converted to Play's EventSource class.


I adapted DbNotificationController from the Play streaming example application, which you can use to experiment. If you would like to do so, obviously you need to integrate DbNotificationController into that project:

  1. Add "com.github.mauricio" %% "postgresql-async" % "0.2.21" to build.sbt.
  2. Set up PostgreSQL as needed, including the NOTIFY, and adjust the database URL in the controller according to your configuration.
  3. Copy and paste DbNotificationController into /app/controllers/.
  4. Copy the following file (call it scaladbnotification.scala.html) into app/views/:
@main {

    <h1>Server Sent Event from DB</h1>

    <h1 id="db"></h1>

    <p>
        DB events are pushed from the Server using a Server Sent Event connection.
    </p>

    <script type="text/javascript" charset="utf-8">
        if (!!window.EventSource) {
            var stringSource = new EventSource("@routes.DbNotificationController.streamDb()");
            stringSource.addEventListener('message', function(e) {
                $('#db').html(e.data.replace(/(\d)/g, '<span>$1</span>'))
            });
        } else {
            $("#db").html("Sorry. This browser doesn't seem to support Server sent event. Check <a href='http://html5test.com/compare/feature/communication-eventSource.html'>html5test</a> for browser compatibility."); 
        }
    </script>    
}

  1. In the /conf/routes file, add the following lines:
    GET /scala/dbNotification           controllers.DbNotificationController.index()
    GET /scala/dbNotification/liveDb    controllers.DbNotificationController.streamDb()
  1. Start the application with sbt run and navigate to the following URL in your browser:

    http://localhost:9000/scala/dbNotification

like image 76
Jeffrey Chung Avatar answered Nov 04 '22 14:11

Jeffrey Chung