Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala Slick: Never ending stream

With Slick you can do the following to produce a stream of results from a table:

val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)

p.foreach { s => println(s"Event: $s") }

That will print all the events in the events table and terminate after the last row.

Assuming you can be notified in some way of when new rows are entered into the events table, is it possible to write a stream that would continuously output events as they were inserted? A sort of tail -f for a DB table.

I think Slick won't support this natively, but I think it should be possible to use Akka streaming to help. So if you could have something that took from the Slick Source until it was empty, then waited for an event to indicate more data in the table, then streamed the new data. Possibly by using an ActorPublisher to bind this logic?

Just wondering if someone has any experience in this area or any advice?

like image 841
David Avatar asked Dec 14 '15 18:12

David


1 Answers

You were correct about ActorPublisher :) Here a is simple example using PostgreSQL, async DB driver and LISTEN/NOTIFY mechanism:

Actor:

class PostgresListener extends ActorPublisher[String] {

  override def receive = {
    case _ ⇒
      val configuration = URLParser.parse(s"jdbc://postgresql://$host:$port/$db?user=$user&password=$password")
      val connection = new PostgreSQLConnection(configuration)
      Await.result(connection.connect, 5.seconds)

      connection.sendQuery(s"LISTEN $channel")
      connection.registerNotifyListener { message ⇒ onNext(message.payload) }
  }
}

Service:

def stream: Source[ServerSentEvent, Unit] = {
  val dataPublisherRef = Props[PostgresListener]
  val dataPublisher = ActorPublisher[String](dataPublisherRef)

  dataPublisherRef ! "go"

  Source(dataPublisher)
    .map(ServerSentEvent(_))
    .via(WithHeartbeats(10.second))
}

build.sbt in libraryDependencies:

"com.github.mauricio"  %% "postgresql-async"         % "0.2.18"

Postgres trigger should call select pg_notify('foo', 'payload')

As far as I know, Slick does not support LISTEN.

like image 80
Anna Zubenko Avatar answered Oct 06 '22 11:10

Anna Zubenko