Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transforming Slick Streaming data and sending Chunked Response using Akka Http

The aim is to stream data from a database, perform some computation on this chunk of data(this computation returns a Future of some case class) and send this data as chunked response to the user. Currently I am able to stream data and send the response without performing any computation. However, I am unable to perform this computation and then stream the result.

This is the route I have implemented.

def streamingDB1 =
path("streaming-db1") {
  get {
    val src = Source.fromPublisher(db.stream(getRds))
    complete(src)
  }
}

The function getRds returns the rows of a table mapped into a case class(Using slick). Now consider the function compute which takes each row as an input and returns a Future of another case class. Something like

def compute(x: Tweet) : Future[TweetNew] = ?

How can I implement this function on variable src and send the chunked response(as a stream) of this computation to the user.

like image 225
user3294786 Avatar asked Dec 27 '17 12:12

user3294786


2 Answers

You could transform the source using mapAsync:

val src =
  Source.fromPublisher(db.stream(getRds))
        .mapAsync(parallelism = 3)(compute)

complete(src)

Adjust the level of parallelism as needed.


Note that you might need to configure a few settings as mentioned in the Slick documentation:

Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.

So if you're using PostgreSQL, for example, then your Source might look something like the following:

val src =
  Source.fromPublisher(
    db.stream(
      getRds.withStatementParameters(
        rsType = ResultSetType.ForwardOnly,
        rsConcurrency = ResultSetConcurrency.ReadOnly,
        fetchSize = 10
      ).transactionally
    )
  ).mapAsync(parallelism = 3)(compute)
like image 165
Jeffrey Chung Avatar answered Nov 11 '22 18:11

Jeffrey Chung


You need to have a way to marshall TweetNew and also if you send a chunk with length 0 client may close connection.

This code works with curl:

case class TweetNew(str: String)

def compute(string: String) : Future[TweetNew] = Future {
  TweetNew(string)
}

val route = path("hello") {
  get {
    val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
      .mapAsync(2)(compute)
      .map(tweet => ByteString(tweet.str + "\n"))
    complete(HttpEntity(ContentTypes.`text/plain(UTF-8)`, byteString))
  }
}
like image 1
Chetan Kumar Meena Avatar answered Nov 11 '22 19:11

Chetan Kumar Meena