I have a system using Akka which currently handles incoming streaming data over message queues. When a record arrives then it is processed, mq is acked and record is passed on for further handling within the system.
Now I would like to add support for using DBs as input.
What would be a way to go for the input source to be able to handle DB (should stream in > 100M records at the pace that the receiver can handle - so I presume reactive/akka-streams?)?
Slick Library
Slick streaming is how this is usually done.
Extending the slick documentation a bit to include akka streams:
//SELECT Name from Coffees
val q = for (c <- coffees) yield c.name
val action = q.result
type Name = String
val databasePublisher : DatabasePublisher[Name] = db stream action
import akka.stream.scaladsl.Source
val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher
Now akkaSourceFromSlick
is like any other akka stream Source
.
"Old School" ResultSet
It is also possible to use a plain ResultSet
, without slick, as the "engine" for an akka stream. We will utilize the fact that a stream Source
can be instantiated from an Iterator
.
First create the ResultSet using standard jdbc techniques:
import java.sql._
val resultSetGenerator : () => Try[ResultSet] = Try {
val statement : Statement = ???
statement executeQuery "SELECT Name from Coffees"
}
Of course all ResultSet instances have to move the cursor before the first row:
val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] =
(resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)
Once we start iterating through rows we'll have to pull the value from the correct column:
val getNameFromResultSet : ResultSet => Name = _ getString "Name"
And now we can implement the Iterator
Interface to create a Iterator[Name]
from a ResultSet:
val convertResultSetToNameIterator : ResultSet => Iterator[Name] =
(resultSet) => new Iterator[Try[Name]] {
override def hasNext : Boolean = resultSet.next
override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
} flatMap (_.toOption)
And finally, glue all the pieces together to create the function we'll need to pass to Source.fromIterator
:
val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] =
(_ : () => Try[ResultSet])
.andThen(_ flatMap adjustResultSetBeforeFirst)
.andThen(_ map convertResultSetToNameIterator)
.andThen(_ getOrElse Iterator.empty)
This Iterator can now feed a Source:
val akkaSourceFromResultSet : Source[Name, _] =
Source fromIterator resultSetGenToNameIterator(resultSetGenerator)
This implementation is reactive all the way down to the database. Since the ResultSet pre-fetches a limited number of rows at a time, data will only come off the hard drive through the database as the stream Sink
signals demand.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With