Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using dynamic sink destination in Akka Streams

I'm rather new to Akka and am trying to learn the basics. My use case is to continually read messages from a JMS queue and output each message to a new file. I have the basic setup working with:

Source<String, NotUsed> jmsSource =
  JmsSource
    .textSource(JmsSourceSettings
    .create(connectionFactory)
    .withQueue("myQueue")
    .withBufferSize(10));

Sink<ByteString, CompletionStage<IOResult>> fileSink =
  FileIO.toFile(new File("random.txt"));

final Flow<String, ByteString, NotUsed> flow = Flow.fromFunction((String n) -> ByteString.fromString(n));

final RunnableGraph<NotUsed> runnable = jmsSource.via(flow).to(fileSink);

runnable.run(materializer);

However, I want the file name to be dynamic (and not hard-coded to "random.txt"): it should be changed depending on the content of each message on the queue. I could, of course, pick up the file name in the flow, but how do I set that name in fileSink? How do I best set this up?

like image 866
Christian Morin Avatar asked Jul 19 '17 13:07

Christian Morin


2 Answers

I created a simple Sink based on akka.stream.impl.LazySink. I have only tested it with a single element in the successful case so feel free to comment on here or the GitHub Gist.

import akka.NotUsed
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage._

class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] {

  val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in")
  override val shape = SinkShape(in)

  override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {

    override def preStart(): Unit = pull(in)

    val awaitingElementHandler = new InHandler {
      override def onPush(): Unit = {
        val element = grab(in)
        val innerSource = createInnerSource(element)
        val innerSink = sink(element)
        Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer)
      }

      override def onUpstreamFinish(): Unit = completeStage()

      override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
    }
    setHandler(in, awaitingElementHandler)

    def createInnerSource(element: T): SubSourceOutlet[T] = {
      val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource")

      innerSource.setHandler(new OutHandler {
        override def onPull(): Unit = {
          innerSource.push(element)
          innerSource.complete()
          if (isClosed(in)) {
            completeStage()
          } else {
            pull(in)
            setHandler(in, awaitingElementHandler)
          }
        }

        override def onDownstreamFinish(): Unit = {
          innerSource.complete()
          if (isClosed(in)) {
            completeStage()
          }
        }
      })

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val illegalStateException = new IllegalStateException("Got a push that we weren't expecting")
          innerSource.fail(illegalStateException)
          failStage(illegalStateException)
        }

        override def onUpstreamFinish(): Unit = {
          // We don't stop until the inner stream stops.
          setKeepGoing(true)
        }

        override def onUpstreamFailure(ex: Throwable): Unit = {
          innerSource.fail(ex)
          failStage(ex)
        }
      })

      innerSource
    }
  }
}

object OneToOneOnDemandSink {

  def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink))
}

This will create a new Sink for each element so it avoids a whole lot of the complexity that LazySink has and there is also no sensible materialized value to return.

like image 177
steinybot Avatar answered Sep 28 '22 11:09

steinybot


Below are three equivalent approaches:

  • one more terse, using map and an inner graph,
  • one more terse, using flatMapConcat and an inner graph,
  • one more verbose, using the GraphDSL.

In all cases, the output is:

$ tail -n +1 -- *.txt
==> 1.txt <==
1

==> 2.txt <==
2

==> 3.txt <==
3

==> 4.txt <==
4

==> 5.txt <==
5

  1. Using map:

    import java.nio.file.Paths
    
    import akka.actor.ActorSystem
    import akka.stream._
    import akka.stream.scaladsl.{FileIO, Sink, Source}
    import akka.util.ByteString
    
    import scala.concurrent.Future
    
    object Example extends App {
      override def main(args: Array[String]): Unit = {
        implicit val system = ActorSystem("Example")
        implicit val materializer = ActorMaterializer()
    
        val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
          .map(
            elem => Source.single(ByteString(s"$elem\n"))
                .runWith(FileIO.toPath(Paths.get(s"$elem.txt")))
          )
          .runWith(Sink.seq)
    
        implicit val ec = system.dispatcher
        result.onComplete(_ => system.terminate())
      }
    }
    

    Explanation: We map the Int element to a function which creates an inner graph, Source.single(ByteString(...)).runWith(FileIO.toPath(...), which serializes and writes to a dynamic path, and allows us to accumulate the resulting Future[IOResult] via Sink.seq.

    Documentation:

    map Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.

    emits when the mapping function returns an element

    backpressures when downstream backpressures

    completes when upstream completes

    See also:

    • https://doc.akka.io/docs/akka/current/stream/stages-overview.html#map

  1. Using flatMapConcat:

    import java.nio.file.Paths
    
    import akka.actor.ActorSystem
    import akka.stream._
    import akka.stream.scaladsl.{FileIO, Sink, Source}
    import akka.util.ByteString
    
    import scala.concurrent.Future
    
    object Example extends App {
      override def main(args: Array[String]): Unit = {
        implicit val system = ActorSystem("Example")
        implicit val materializer = ActorMaterializer()
    
        val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
          .flatMapConcat(
            elem => Source.single(
              Source.single(ByteString(s"$elem\n"))
                .runWith(FileIO.toPath(Paths.get(s"$elem.txt")))
            )
          )
          .runWith(Sink.seq)
    
        implicit val ec = system.dispatcher
        result.onComplete(_ => system.terminate())
      }
    }
    

    Explanation: flatMapConcat requires a Source. We therefore create one which emits the mat of the inner graph, Source.single(ByteString(...)).runWith(FileIO.toPath(...), which allows us to accumulate the resulting Future[IOResult] via Sink.seq. Actual serialization and dispatch is done by the inner graph.

    Documentation:

    flatMapConcat Transform each input element into a Source whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.

    emits when the current consumed substream has an element available

    backpressures when downstream backpressures

    completes when upstream completes and all consumed substreams complete

    See also:

    • https://doc.akka.io/docs/akka/current/stream/stages-overview.html#flatmapconcat
    • https://doc.akka.io/docs/akka/current/stream/stream-substream.html#flatmapconcat

  1. Custom Sink using the GraphDSL:

    import java.nio.file.Path
    
    import akka.stream.scaladsl.{Broadcast, FileIO, Flow, GraphDSL, Sink, Source, ZipWith}
    import akka.stream.{IOResult, Materializer, SinkShape}
    import akka.util.ByteString
    
    import scala.concurrent.Future
    
    object FileSinks {
      def dispatch[T](
                       dispatcher: T => Path,
                       serializer: T => ByteString
                     )(
                       implicit materializer: Materializer
                     ): Sink[T, Future[Seq[Future[IOResult]]]] =
        Sink.fromGraph(
          GraphDSL.create(
            Sink.seq[Future[IOResult]]
          ) {
            implicit builder =>
              sink =>
                // prepare this sink's graph elements:
                val broadcast = builder.add(Broadcast[T](2))
                val serialize = builder.add(Flow[T].map(serializer))
                val dispatch = builder.add(Flow[T].map(dispatcher))
                val zipAndWrite = builder.add(ZipWith[ByteString, Path, Future[IOResult]](
                  (bytes, path) => Source.single(bytes).runWith(FileIO.toPath(path)))
                )
    
                // connect the graph:
                import GraphDSL.Implicits._
                broadcast.out(0) ~> serialize ~> zipAndWrite.in0
                broadcast.out(1) ~> dispatch ~> zipAndWrite.in1
                zipAndWrite.out ~> sink
    
                // expose ports:
                SinkShape(broadcast.in)
          }
        )
    }
    ----
    import java.nio.file.Paths
    
    import FileSinks
    import akka.actor.ActorSystem
    import akka.stream._
    import akka.stream.scaladsl.Source
    import akka.util.ByteString
    
    import scala.concurrent.Future
    
    object Example extends App {
      override def main(args: Array[String]): Unit = {
        implicit val system = ActorSystem("Example")
        implicit val materializer = ActorMaterializer()
    
        val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
          .runWith(FileSinks.dispatch[Int](
            elem => Paths.get(s"$elem.txt"),
            elem => ByteString(s"$elem\n"))
          )
    
        implicit val ec = system.dispatcher
        result.onComplete(_ => system.terminate())
      }
    }
    
    • dispatcher is a function to convert your input object to a path, this is where you can dynamically decide on the path.
    • serializer is a function to serialise your input object.
    • the rest uses the GraphDSL, see also: https://doc.akka.io/docs/akka/current/stream/stream-graphs.html#constructing-graphs

Disclaimer: I am myself still in the process of learning Akka Stream.

like image 41
Marc Carré Avatar answered Sep 28 '22 13:09

Marc Carré