I'm rather new to Akka and am trying to learn the basics. My use case is to continually read messages from a JMS queue and output each message to a new file. I have the basic setup working with:
Source<String, NotUsed> jmsSource =
JmsSource
.textSource(JmsSourceSettings
.create(connectionFactory)
.withQueue("myQueue")
.withBufferSize(10));
Sink<ByteString, CompletionStage<IOResult>> fileSink =
FileIO.toFile(new File("random.txt"));
final Flow<String, ByteString, NotUsed> flow = Flow.fromFunction((String n) -> ByteString.fromString(n));
final RunnableGraph<NotUsed> runnable = jmsSource.via(flow).to(fileSink);
runnable.run(materializer);
However, I want the file name to be dynamic (and not hard-coded to "random.txt"): it should be changed depending on the content of each message on the queue. I could, of course, pick up the file name in the flow, but how do I set that name in fileSink
? How do I best set this up?
I created a simple Sink based on akka.stream.impl.LazySink
. I have only tested it with a single element in the successful case so feel free to comment on here or the GitHub Gist.
import akka.NotUsed
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage._
class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] {
val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in")
override val shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(in)
val awaitingElementHandler = new InHandler {
override def onPush(): Unit = {
val element = grab(in)
val innerSource = createInnerSource(element)
val innerSink = sink(element)
Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer)
}
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
}
setHandler(in, awaitingElementHandler)
def createInnerSource(element: T): SubSourceOutlet[T] = {
val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource")
innerSource.setHandler(new OutHandler {
override def onPull(): Unit = {
innerSource.push(element)
innerSource.complete()
if (isClosed(in)) {
completeStage()
} else {
pull(in)
setHandler(in, awaitingElementHandler)
}
}
override def onDownstreamFinish(): Unit = {
innerSource.complete()
if (isClosed(in)) {
completeStage()
}
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val illegalStateException = new IllegalStateException("Got a push that we weren't expecting")
innerSource.fail(illegalStateException)
failStage(illegalStateException)
}
override def onUpstreamFinish(): Unit = {
// We don't stop until the inner stream stops.
setKeepGoing(true)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
innerSource.fail(ex)
failStage(ex)
}
})
innerSource
}
}
}
object OneToOneOnDemandSink {
def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink))
}
This will create a new Sink for each element so it avoids a whole lot of the complexity that LazySink
has and there is also no sensible materialized value to return.
Below are three equivalent approaches:
map
and an inner graph,flatMapConcat
and an inner graph,GraphDSL
.In all cases, the output is:
$ tail -n +1 -- *.txt
==> 1.txt <==
1
==> 2.txt <==
2
==> 3.txt <==
3
==> 4.txt <==
4
==> 5.txt <==
5
Using map
:
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{FileIO, Sink, Source}
import akka.util.ByteString
import scala.concurrent.Future
object Example extends App {
override def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Example")
implicit val materializer = ActorMaterializer()
val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
.map(
elem => Source.single(ByteString(s"$elem\n"))
.runWith(FileIO.toPath(Paths.get(s"$elem.txt")))
)
.runWith(Sink.seq)
implicit val ec = system.dispatcher
result.onComplete(_ => system.terminate())
}
}
Explanation:
We map
the Int
element to a function which creates an inner graph, Source.single(ByteString(...)).runWith(FileIO.toPath(...)
, which serializes and writes to a dynamic path, and allows us to accumulate the resulting Future[IOResult]
via Sink.seq
.
Documentation:
map
Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.emits when the mapping function returns an element
backpressures when downstream backpressures
completes when upstream completes
See also:
Using flatMapConcat
:
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{FileIO, Sink, Source}
import akka.util.ByteString
import scala.concurrent.Future
object Example extends App {
override def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Example")
implicit val materializer = ActorMaterializer()
val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
.flatMapConcat(
elem => Source.single(
Source.single(ByteString(s"$elem\n"))
.runWith(FileIO.toPath(Paths.get(s"$elem.txt")))
)
)
.runWith(Sink.seq)
implicit val ec = system.dispatcher
result.onComplete(_ => system.terminate())
}
}
Explanation:
flatMapConcat
requires a Source
. We therefore create one which emits the mat
of the inner graph, Source.single(ByteString(...)).runWith(FileIO.toPath(...)
, which allows us to accumulate the resulting Future[IOResult]
via Sink.seq
. Actual serialization and dispatch is done by the inner graph.
Documentation:
flatMapConcat
Transform each input element into aSource
whose elements are then flattened into the output stream through concatenation. This means each source is fully consumed before consumption of the next source starts.emits when the current consumed substream has an element available
backpressures when downstream backpressures
completes when upstream completes and all consumed substreams complete
See also:
Custom Sink
using the GraphDSL
:
import java.nio.file.Path
import akka.stream.scaladsl.{Broadcast, FileIO, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{IOResult, Materializer, SinkShape}
import akka.util.ByteString
import scala.concurrent.Future
object FileSinks {
def dispatch[T](
dispatcher: T => Path,
serializer: T => ByteString
)(
implicit materializer: Materializer
): Sink[T, Future[Seq[Future[IOResult]]]] =
Sink.fromGraph(
GraphDSL.create(
Sink.seq[Future[IOResult]]
) {
implicit builder =>
sink =>
// prepare this sink's graph elements:
val broadcast = builder.add(Broadcast[T](2))
val serialize = builder.add(Flow[T].map(serializer))
val dispatch = builder.add(Flow[T].map(dispatcher))
val zipAndWrite = builder.add(ZipWith[ByteString, Path, Future[IOResult]](
(bytes, path) => Source.single(bytes).runWith(FileIO.toPath(path)))
)
// connect the graph:
import GraphDSL.Implicits._
broadcast.out(0) ~> serialize ~> zipAndWrite.in0
broadcast.out(1) ~> dispatch ~> zipAndWrite.in1
zipAndWrite.out ~> sink
// expose ports:
SinkShape(broadcast.in)
}
)
}
----
import java.nio.file.Paths
import FileSinks
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.concurrent.Future
object Example extends App {
override def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Example")
implicit val materializer = ActorMaterializer()
val result: Future[Seq[Future[IOResult]]] = Source(1 to 5)
.runWith(FileSinks.dispatch[Int](
elem => Paths.get(s"$elem.txt"),
elem => ByteString(s"$elem\n"))
)
implicit val ec = system.dispatcher
result.onComplete(_ => system.terminate())
}
}
dispatcher
is a function to convert your input object to a path, this is where you can dynamically decide on the path.serializer
is a function to serialise your input object.GraphDSL
, see also: https://doc.akka.io/docs/akka/current/stream/stream-graphs.html#constructing-graphs
Disclaimer: I am myself still in the process of learning Akka Stream.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With