I'm creating a simple message delivery service using Akka stream. The service is just like mail delivery, where elements from source include destination
and content
like:
case class Message(destination: String, content: String)
and the service should deliver the messages to appropriate sink based on the destination
field. I created a DeliverySink
class to let it have a name:
case class DeliverySink(name: String, sink: Sink[String, Future[Done]])
Now, I instantiated two DeliverySink
, let me call them sinkX
and sinkY
, and created a map based on their name. In practice, I want to provide a list of sink names and the list should be configurable.
The challenge I'm facing is how to dynamically choose an appropriate sink based on the destination
field.
Eventually, I want to map Flow[Message]
to a sink. I tried:
val sinkNames: List[String] = List("sinkX", "sinkY")
val sinkMapping: Map[String, DeliverySink] =
sinkNames.map { name => name -> DeliverySink(name, ???)}.toMap
Flow[Message].map { msg => msg.content }.to(sinks(msg.destination).sink)
but, obviously this doesn't work because we can't reference msg
outside of map...
I guess this is not a right approach. I also thought about using filter
with broadcast
, but if the destination scales to 100, I cannot type every routing. What is a right way to achieve my goal?
[Edit]
Ideally, I would like to make destinations dynamic. So, I cannot statically type all destinations in filter or routing logic. If a destination sink has not been connected, it should create a new sink dynamically too.
If You Have To Use Multiple Sinks
Sink.combine
would directly suite your existing requirements. If you attach an appropriate Flow.filter
before each Sink
then they'll only receive the appropriate messages.
Don't Use Multiple Sinks
In general I think it is bad design to have the structure, and content, of streams contain business logic. Your stream should be a thin veneer for back-pressured concurrency on top of business logic which is in ordinary scala/java code.
In this particular case, I think it would be best to wrap your destination routing inside of a single Sink and the logic should be implemented inside of a separate function. For example:
val routeMessage : (Message) => Unit =
(message) =>
if(message.destination equalsIgnoreCase "stdout")
System.out println message.content
else if(message.destination equalsIgnoreCase "stderr")
System.err println message.content
val routeSink : Sink[Message, _] = Sink foreach routeMessage
Note how much easier it is to now test my routeMessage
since it isn't inside of the stream: I don't need any akka testkit "stuff" to test routeMessage. I can also move the function to a Future
or a Thread
if my concurrency design were to change.
Many Destinations
If you have many destinations you can use a Map
. Suppose, for example, you are sending your messages to AmazonSQS. You could define a function to convert a Queue Name to Queue URL and use that function to maintain a Map of already created names:
type QueueName = String
val nameToRequest : (QueueName) => CreateQueueRequest = ??? //implementation unimportant
type QueueURL = String
val nameToURL : (AmazonSQS) => (QueueName) => QueueURL = {
val nameToURL = mutable.Map.empty[QueueName, QueueURL]
(sqs) => (queueName) => nameToURL.get(queueName) match {
case Some(url) => url
case None => {
sqs.createQueue(nameToRequest(queueName))
val url = sqs.getQueueUrl(queueName).getQueueUrl()
nameToURL put (queueName, url)
url
}
}
}
Now you can use this non-stream function inside of a singular Sink:
val sendMessage : (AmazonSQS) => (Message) => Unit =
(sqs) => (message) =>
sqs sendMessage {
(new SendMessageRequest())
.withQueueUrl(nameToURL(sqs)(message.destination))
.withMessageBody(message.content)
}
val sqs : AmazonSQS = ???
val messageSink = Sink foreach sendMessage(sqs)
Side Note
For destination
you probably want to use something other than String
. A coproduct is usually better because they can be used with case statements and you'll get helpful compiler errors if you miss one of the possibilities:
sealed trait Destination
object Out extends Destination
object Err extends Destination
object SomethingElse extends Destination
case class Message(destination: Destination, content: String)
//This function won't compile because SomethingElse doesn't have a case
val routeMessage : (Message) => Unit =
(message) => message.destination match {
case Out =>
System.out.println(message.content)
case Err =>
System.err.println(message.content)
}
Given your requirement, maybe you want to consider multiplexing your stream source into substreams using groubBy:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.util.ByteString
import akka.{NotUsed, Done}
import akka.stream.IOResult
import scala.concurrent.Future
import java.nio.file.Paths
import java.nio.file.StandardOpenOption._
implicit val system = ActorSystem("sys")
implicit val materializer = ActorMaterializer()
import system.dispatcher
case class Message(destination: String, content: String)
case class DeliverySink(name: String, sink: Sink[ByteString, Future[IOResult]])
val messageSource: Source[Message, NotUsed] = Source(List(
Message("a", "uuu"), Message("a", "vvv"),
Message("b", "xxx"), Message("b", "yyy"), Message("b", "zzz")
))
val sinkA = DeliverySink("sink-a", FileIO.toPath(
Paths.get("/path/to/sink-a.txt"), options = Set(CREATE, WRITE)
))
val sinkB = DeliverySink("sink-b", FileIO.toPath(
Paths.get("/path/to/sink-b.txt"), options = Set(CREATE, WRITE)
))
val sinkMapping: Map[String, DeliverySink] = Map("a" -> sinkA, "b" -> sinkB)
val totalDests = 2
messageSource.map(m => (m.destination, m)).
groupBy(totalDests, _._1).
fold(("", List.empty[Message])) {
case ((_, list), (dest, msg)) => (dest, msg :: list)
}.
mapAsync(parallelism = totalDests) {
case (dest: String, msgList: List[Message]) =>
Source(msgList.reverse).map(_.content).map(ByteString(_)).
runWith(sinkMapping(dest).sink)
}.
mergeSubstreams.
runWith(Sink.ignore)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With