Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting a callback-method implementation into an akka stream Source

I am working with a data publisher from a java library that I do not control. The publisher library uses a typical callback setup; somewhere in the library code (the library is java but I will describe in scala for terseness):

type DataType = ???

trait DataConsumer {
  def onData(data : DataType) : Unit
}

The user of the library is required to write a class that implements the onData method and pass that into a DataProducer, the library code looks something like:

class DataProducer(consumer : DataConsumer) {...}

The DataProducer has its own internal thread I cannot control, and accompanying data buffer, that is calling onData whenever there is another DataType object to consume.

So, my question is: how do I write a layer that will convert/translate the original library pattern into an akka stream Source object?

Thank you in advance.

like image 825
Ramón J Romero y Vigil Avatar asked Apr 03 '15 16:04

Ramón J Romero y Vigil


2 Answers

Callback --> Source

Elaborating on Endre Varga's answer, below is the code that will create the DataConsumer callback function which will send messages into an akka stream Source.

Caution: There is a lot more to creating a functional ActorPublish than I am indicating below. In particular, buffering needs to be done to handle the case where the DataProducer is calling onData faster than the Sink is signalling demand (see this example). The below code just sets up the "wiring".

import akka.actor.ActorRef
import akka.actor.Actor.noSender

import akka.stream.Actor.ActorPublisher

/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
  def receive : Receive = {
    case message : DataType => deliverBuf() //defined in example link
  }    
}

class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
  override def onData(data : DataType) = sourceActor.tell(data, noSender)
}

//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]

//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)

//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))

//setup the incoming data feed from 3rd party library
val dataProducer  = DataProducer(actorConsumer)

Callback --> Whole Stream

The original question ask specifically for a callback to Source, but dealing with callbacks is easier to handle if the entire stream is already available (not just the Source). That is because the stream can be materialized into an ActorRef using the Source#actorRef function. As an example:

val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val bufferSize = 100

val streamRef = 
  Source
    .actorRef[DataType](bufferSize, overflowStrategy)
    .via(someFlow)
    .to(someSink)
    .run()

val streamConsumer = new DataConsumer {
  override def onData(data : DataType) : Unit = streamRef ! data
} 

val dataProducer = DataProducer(streamConsumer)
like image 124
Ramón J Romero y Vigil Avatar answered Sep 21 '22 05:09

Ramón J Romero y Vigil


There are various ways this can be solved. One is to use an ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors where you can just change the callback so that it sends a message to the actor. Depending how the callback works, you might be able to use mapAsync, too (converting a callback to a Future). That will only work if one request produces exactly one callback call.

like image 42
Endre Varga Avatar answered Sep 20 '22 05:09

Endre Varga