Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can Akka streams be materialized continually?

I am using Akka Streams in Scala to poll from an AWS SQS queue using the AWS Java SDK. I created an ActorPublisher which dequeues messages on a two second interval:

class SQSSubscriber(name: String) extends ActorPublisher[Message] {
  implicit val materializer = ActorMaterializer()

  val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")

  val client = new AmazonSQSClient()
  client.setRegion(RegionUtils.getRegion("us-east-1"))
  val url = client.getQueueUrl(name).getQueueUrl

  val MaxBufferSize = 100
  var buf = Vector.empty[Message]

  override def receive: Receive = {
    case "dequeue" =>
      val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
      messages.foreach(self ! _)
    case message: Message if buf.size == MaxBufferSize =>
      log.error("The buffer is full")
    case message: Message =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(message)
      else {
        buf :+= message
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

In my application, I am attempting to run the flow at a 2 second interval as well:

val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
  .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
  .to(Sink.ignore)

system.scheduler.schedule(0 seconds, 2 seconds) {
  flow.runWith(sqsSource)(ActorMaterializer()(system))
}

However, when I run my application I receive java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds] and subsequent dead letter notices which is caused by the ActorMaterializer.

Is there a recommended approach for continually materializing an Akka Stream?

like image 705
David Caseria Avatar asked Sep 12 '15 22:09

David Caseria


People also ask

Can you describe what are 3 main components of Akka streams?

Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element. Here we can say Source and Sink are the special cases of Flow.

How does Akka stream work?

Akka Streams components work with a demand-based protocol. In other words, data flows through the graph as a response to demand from receivers. Producers then comply and send more elements downstream. A second (transparent) protocol kicks in when production of elements is faster than demand.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.

How does Akka handle concurrency?

Akka's approach to handling concurrency is based on the Actor Model. In an actor-based system, everything is an actor, in much the same way that everything is an object in object-oriented design.


1 Answers

I don't think you need to create a new ActorPublisher every 2 seconds. This seems redundant and wasteful of memory. Also, I don't think an ActorPublisher is necessary. From what I can tell of the code, your implementation will have an ever growing number of Streams all querying the same data. Each Message from the client will be processed by N different akka Streams and, even worse, N will grow over time.

Iterator For Infinite Loop Querying

You can get the same behavior from your ActorPublisher by using scala's Iterator. It is possible to create an Iterator which continuously queries the client:

//setup the client
val client = {
  val sqsClient = new AmazonSQSClient()
  sqsClient setRegion (RegionUtils getRegion "us-east-1")
  sqsClient
}

val url = client.getQueueUrl(name).getQueueUrl

//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
  client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor : Iterator[Iterable[Message]] = 
  Iterator continually messageListStream

//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity

This implementation only queries the client when all previous Messages have been consumed and is therefore truly reactive. No need to keep track of a buffer with fixed size. Your solution needs a buffer because the creation of Messages (via a timer) is de-coupled from the consumption of Messages (via println). In my implementation, creation & consumption are tightly coupled via back-pressure.

Akka Stream Source

You can then use this Iterator generator-function to feed an akka stream Source:

def messageSource : Source[Message, _] = Source fromIterator messageIterator

Flow Formation

And finally you can use this Source to perform the println (As a side note: your flow value is actually a Sink since Flow + Sink = Sink). Using your flow value from the question:

messageSource runWith flow

One akka Stream processing all messages.

like image 180
Ramón J Romero y Vigil Avatar answered Nov 15 '22 05:11

Ramón J Romero y Vigil