The aim is to stream data from a database, perform some computation on this chunk of data(this computation returns a Future of some case class) and send this data as chunked response to the user. Currently I am able to stream data and send the response without performing any computation. However, I am unable to perform this computation and then stream the result.
This is the route I have implemented.
def streamingDB1 =
path("streaming-db1") {
get {
val src = Source.fromPublisher(db.stream(getRds))
complete(src)
}
}
The function getRds returns the rows of a table mapped into a case class(Using slick). Now consider the function compute which takes each row as an input and returns a Future of another case class. Something like
def compute(x: Tweet) : Future[TweetNew] = ?
How can I implement this function on variable src and send the chunked response(as a stream) of this computation to the user.
You could transform the source using mapAsync
:
val src =
Source.fromPublisher(db.stream(getRds))
.mapAsync(parallelism = 3)(compute)
complete(src)
Adjust the level of parallelism as needed.
Note that you might need to configure a few settings as mentioned in the Slick documentation:
Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both
.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)
(with the desired page sizen
) and.transactionally
for proper streaming.
So if you're using PostgreSQL, for example, then your Source
might look something like the following:
val src =
Source.fromPublisher(
db.stream(
getRds.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
).transactionally
)
).mapAsync(parallelism = 3)(compute)
You need to have a way to marshall TweetNew and also if you send a chunk with length 0 client may close connection.
This code works with curl:
case class TweetNew(str: String)
def compute(string: String) : Future[TweetNew] = Future {
TweetNew(string)
}
val route = path("hello") {
get {
val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
.mapAsync(2)(compute)
.map(tweet => ByteString(tweet.str + "\n"))
complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, byteString))
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With