Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How are reactive streams used in Slick for inserting data

In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?

I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:

val source = Source(0 to 100)

how can I crete a Sink with Slick that writes those values into a table with schema:

create table NumberTable (value INT)

like image 762
tonicebrian Avatar asked Apr 04 '16 10:04

tonicebrian


2 Answers

Serial Inserts

The easiest way would be to do inserts within a Sink.foreach.

Assuming you've used the schema code generation and further assuming your table is named "NumberTable"

//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database forConfig "NumberTableConfig"

We can write a function that does the insertion

def insertIntoDb(num : Int) = 
  numberTableDB run (Numbertable += NumbertableRow(num))

And that function can be placed in the Sink

val insertSink = Sink[Int] foreach insertIntoDb

Source(0 to 100) runWith insertSink

Batched Inserts

You could further extend the Sink methodology by batching N inserts at a time:

def batchInsertIntoDb(nums : Seq[Int]) = 
  numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))

val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb

This batched Sink can be fed by a Flow which does the batch grouping:

val batchSize = 10

Source(0 to 100).via(Flow[Int].grouped(batchSize))
                .runWith(batchInsertSink)
like image 200
Ramón J Romero y Vigil Avatar answered Oct 14 '22 06:10

Ramón J Romero y Vigil


Although you can use a Sink.foreach to achieve this (as mentioned by Ramon) it is safer and likely faster (by running the inserts in parallel) to use the mapAsync Flow. The problem you will face with using Sink.foreach is that it does not have a return value. Inserting into a database via slicks db.run method returns a Future which will then escape out of the steams returned Future[Done] which completes as soon as the Sink.foreach finishes.

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()

class Numbers(tag: Tag) extends Table[Int](tag, "NumberTable") {
  def value = column[Int]("value")
  def * = value
}

val numbers = TableQuery[Numbers]

val db = Database.forConfig("postgres")
Await.result(db.run(numbers.schema.create), Duration.Inf)

val streamFuture: Future[Done] = Source(0 to 100)
  .runWith(Sink.foreach[Int] { (i: Int) =>
    db.run(numbers += i).foreach(_ => println(s"stream 1 insert $i done"))
  })
Await.result(streamFuture, Duration.Inf)
println("stream 1 done")

//// sample 1 output: ////
// stream 1 insert 1 done
// ...
// stream 1 insert 99 done
// stream 1 done    <-- stream Future[Done] returned before inserts finished
// stream 1 insert 100 done

On the other hand the def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T]) Flow allows you to run the inserts in parallel via the parallelism paramerter and accepts a function from the upstream out value to a future of some type. This matches our i => db.run(numbers += i) function. The great thing about this Flow is that it then feeds the result of these Futures downstream.

val streamFuture2: Future[Done] = Source(0 to 100)
  .mapAsync(1) { (i: Int) =>
    db.run(numbers += i).map { r => println(s"stream 2 insert $i done"); r }
  }
  .runWith(Sink.ignore)
Await.result(streamFuture2, Duration.Inf)
println("stream 2 done")

//// sample 2 output: ////
// stream 2 insert 1 done
// ...
// stream 2 insert 100 done
// stream 1 done    <-- stream Future[Done] returned after inserts finished

To prove the point you can even return a real result from the stream rather than a Future[Done] (With Done representing Unit). This stream will also add a higher parallelism value and batching for extra performance. *

val streamFuture3: Future[Int] = Source(0 to 100)
  .via(Flow[Int].grouped(10)) // Batch in size 10
  .mapAsync(2)((ints: Seq[Int]) => db.run(numbers ++= ints).map(_.getOrElse(0))) // Insert batches in parallel, return insert count
  .runWith(Sink.fold(0)(_+_)) // count all inserts and return total
val rowsInserted = Await.result(streamFuture3, Duration.Inf)
println(s"stream 3 done, inserted $rowsInserted rows")

// sample 3 output:
// stream 3 done, inserted 101 rows
  • Note: You probably won't see better performance for such a small data set, but when I was dealing with a 1.7M insert I was able to get the best performance on my machine with a batch size of 1000 and parallelism value of 8, locally with postgresql. This was about twice as good as not running in parallel. As always when dealing with performance your results may vary and you should measure for yourself.
like image 44
Chris Balogh Avatar answered Oct 14 '22 06:10

Chris Balogh