Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to refactor this code by using akka streams.

The idea is to keep the channel opened to use it later. In playframework 2.5.x the documentation says that you have to use akka streams but does not say anything how to achieve this example. Somebody can help me?

import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext

def socket =  WebSocket.using[String] { request =>

  // Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
  val (out, channel) = Concurrent.broadcast[String]

  // log the message to stdout and send response back to client
  val in = Iteratee.foreach[String] {
    msg => println(msg)
      // the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
      // receive the pushed messages
      channel push("I received your message: " + msg)
  }
  (in,out)
}
like image 890
Carlos Hernandez Perez Avatar asked Mar 14 '16 17:03

Carlos Hernandez Perez


People also ask

What is the use of Akka streams?

Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness, and it is the defining feature of Akka Streams.

Which are the 3 main components in a Akka stream?

Akka streams consist of three major components in it – Source, Flow and Sink.

What is Akka Alpakka?

Alpakka is based on Akka Streams and provides first class support for streaming—merging streams, splitting streams, and more. It enables use of reactive programming—providing a rich DSL supporting fully asynchronous and non-blocking processing.

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.


2 Answers

You'll have to do something like this!

val (subscriber, publisher)=Source.asSubscriber[String]
      .toMat(Sink.asPublisher[String](fanout = true))(Keep.both).run()

def websocketAction=WebSocket.accept { requestHeader =>
    Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber),Source.fromPublisher(publisher))
}

The first part will create, given a sink and a flow, the objects that you'll need to push messages and receive them (subscribe to the publisher).

finally you'll create a flow for every websocket request you receive with that code Flow.fromSinkAndSource... Something that's not clear regarding Akka Streams (Sources, Sinks and Flows) is that they represent the shape of the flow, but not the flow per se... the flow goes when you materialize them (with method runWith or run). Now... Play receives either Sources (when using Server Sent Events) or Flows when using WebSockets. And they are not still materialized... so you need to materialize them (the first line) and then creating a Flow AGAIN! (the websocketAction line)

I'm sorry if I'm not clear enough, however use that code, it will work.

like image 55
Alejandro Navas Avatar answered Nov 05 '22 08:11

Alejandro Navas


I finally found a solution using Actors. I found this:

def conect = WebSocket.accept[JsValue, JsValue] {request => 
  ActorFlow.actorRef(out => UserWebSocket.props(out, users))
}

Then I looked at the source code of ActorFlow.actorRef: https://github.com/playframework/playframework/blob/2.5.0/framework/src/play-streams/src/main/scala/play/api/libs/streams/ActorFlow.scala

and came up with this solution:

import javax.inject._
import play.api.Configuration
import play.api.mvc._
import scala.concurrent._

import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.actor._

class UserActor(out: ActorRef) extends Actor {
  def receive = {
    // receives messages from client browser here
    // out is actor that will send messages back to client(s)
    case msg: String => out ! "Received message "+msg
  }
}
object UserActor {
  def props(out: ActorRef) = Props(new UserActor(out))
}

@Singleton
class NotificationController @Inject()(val config:Configuration)
                          (implicit ec: ExecutionContext, actorSystem:ActorSystem, materializer: Materializer) extends Controller {

  // outActor can be used to send messages to client(s)
  // Sink.asPublisher(true) makes this a broadcast channel (multiple clients can connect to this channel, and messages sent to outActor are broadcast to all of them).  Use Sink.asPublisher(false) to create a unicast channel.
  val (outActor, publisher) = Source.actorRef[String](99, OverflowStrategy.dropNew)
        .toMat(Sink.asPublisher(true))(Keep.both).run()


  def flowsocket = WebSocket.accept[String, String] {request =>
    val aflow:Flow[String, String, _] = {

        val sink = Sink.actorRef( actorSystem.actorOf(UserActor.props(outActor)), akka.actor.Status.Success(()) )

        val source = Source.fromPublisher(publisher)

        Flow.fromSinkAndSource(
            sink, source
        )
    }
    aflow
  }

}

I have since revised my solution to more fully embrace the Actor model. I now have a "UsersBroadcastActor" which is a singleton actor that all other "UserActor"s connect to and can communicate via it:

lazy val broadcastActorRef = actorSystem.actorOf(Props[UsersBroadcastActor])

def flowsocket = WebSocket.accept[JsValue, JsValue] { request =>
    ActorFlow.actorRef(out => UserActor.props(out, broadcastActorRef))
}

When UserActor is instantiated, in its preStart() method, it sends a subscription message to the broadcastActorRef, which saves the references to all the UserActors that "subscribe" to it. I can send send a message to broadcastActorRef, and it forwards it to each of the UserActors. Let me know if you'd like full code sample of this solution as well.

like image 38
Leonya Avatar answered Nov 05 '22 08:11

Leonya