Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create backpressure from a Future inside an Akka stream

I'm new to Akka streams and streams in general so I might have completely misunderstood something at a conceptual level, but is there any way I can create backpressure until a future resolves? Essentially what I want to do is like this:

object Parser {
    def parseBytesToSeq(buffer: ByteBuffer): Seq[ExampleObject] = ???
}

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .map(values => doAsyncOp(values))
  .runWith(Sink.seq)

def doAsyncOp(Seq[ExampleObject]) : Future[Any] = ???

Bytes are read from a file and streamed to the parser, which emits Seqs of ExampleObjects, and those are streamed to an async operation that returns a Future. I want to make it so that until the Future resolves, the rest of the stream gets backpressured, then resumes once the Future is resolved, passing another Seq[ExampleObject] to doAsyncOp, which resumes the backpressure and so on.

Right now I've got this working with:

Await.result(doAsyncOp(values), 10 seconds)

But my understanding is that this locks up the entire thread and is bad. Is there any better way around it?

If it helps, the big picture is that I'm trying to parse an extremely large JSON file (too big to fit in memory) chunk-by-chunk with Jawn, then pass objects to ElasticSearch to be indexed as they're parsed - ElasticSearch has a queue of 50 pending operations, if that overflows it starts rejecting new objects.

like image 202
Alex Gilleran Avatar asked Oct 07 '16 04:10

Alex Gilleran


People also ask

Can you describe what are 3 main components of Akka streams?

Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow.

What is mapAsync?

mapAsync takes in a function that returns a type Future[T] . Practical Example. As an example, suppose that we have a function which queries a database for a user's full name based on a user id: type UserID = String type FullName = String val databaseLookup : UserID => FullName = ??? //implementation unimportant.

Is Flink based on Akka?

Akka Streams is a library implementing reactive streams specification. Apache Flink is a streaming engine. The main high level difference is that in Apache Flink you create a job by coding against one of Flink APIs and you submit that job to Apache Flink cluster.

How does Akka backpressure work?

Back-pressure. A means of flow-control, a way for consumers of data to notify a producer about their current availability, effectively slowing down the upstream producer to match their consumption speeds. In the context of Akka Streams back-pressure is always understood as non-blocking and asynchronous.


1 Answers

It's quite easy. You need to use mapAync :)

val futures = FileIO.fromPath(path)
  .map(st => Parser.parseBytesToSeq(st.toByteBuffer))
  .batch(1000, x => x)(_ ++ _)
  .mapAsync(4)(values => doAsyncOp(values))
  .runWith(Sink.seq)

where 4 is level of parallelism.

like image 157
expert Avatar answered Oct 15 '22 23:10

expert