Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I use and return Source queue to caller without materializing it?

I'm trying to use new Akka streams and wonder how I can use and return Source queue to caller without materializing it in my code ?

Imagine we have library that makes number of async calls and returns results via Source. Function looks like this

def findArticlesByTitle(text: String): Source[String, SourceQueue[String]] = {

  val source = Source.queue[String](100, backpressure)

  source.mapMaterializedValue { case queue =>

    val url = s"http://.....&term=$text"
    httpclient.get(url).map(httpResponseToSprayJson[SearchResponse]).map { v =>
      v.idlist.foreach { id =>
        queue.offer(id)
      }

      queue.complete()
    }
  }

  source
}

and caller might use it like this

// There is implicit ActorMaterializer somewhere
val stream = plugin.findArticlesByTitle(title)
val results = stream.runFold(List[String]())((result, article) => article :: result)

When I run this code within mapMaterializedValue is never executed.

I can't understand why I don't have access to instance of SourceQueue if it should be up to caller to decide how to materialize the source.

How should I implement this ?

like image 521
expert Avatar asked May 09 '16 10:05

expert


People also ask

Can you describe what are 3 main components of Akka streams?

Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness and it is the defining feature of Akka Streams. Akka streams consist of three major components in it – Source, Flow and Sink.

What is Akka Materializer?

Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.

What is Akka streaming?

Akka Streams is a module built on top of Akka Actors to make the ingestion and processing of streams easy. It provides easy-to-use APIs to create streams that leverage the power of the Akka toolkit without explicitly defining actor behaviors and messages.


1 Answers

In your code example you're returning source instead of the return value of source.mapMaterializedValue (the method call doesn't mutate the Source object).

like image 66
devkat Avatar answered Nov 15 '22 03:11

devkat