I'm tinkering with Akka and need some advice how to implement something specific i have in mind. I want to have an actor which i can send a DownloadFile(URI, File)
message and downloads it. Since this can be paralleled, I don't want to download file after file but have a limit of concurrent downloads.
Whats the intended way to model something like this with Akka? Other things that come to mind are: What happens if one of the "worker" actor dies for some reason? How to retry the download? Etc. etc.
I know this is a very huge question but i hope someone takes the time to answer it! Thank you!
What is an Actor in Akka? An actor is essentially nothing more than an object that receives messages and takes actions to handle them. It is decoupled from the source of the message and its only responsibility is to properly recognize the type of message it has received and take action accordingly.
Akka streams consist of 3 major components in it – Source, Flow, Sink – and any non-cyclical stream consist of at least 2 components Source, Sink and any number of Flow element.
Akka Actors The Actor Model provides a higher level of abstraction for writing concurrent and distributed systems. It alleviates the developer from having to deal with explicit locking and thread management, making it easier to write correct concurrent and parallel systems.
Give this a shot; it creates three - but you could configure it to create as many as you like - downloaders, so that three download requests could be processed concurrenty.
sealed trait DownloaderMessage
case class DownloadFile(uri: URI, file: File) extends DownloaderMessage
object Downloader {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pool").build
}
class Downloader extends Actor {
self.lifeCycle = Permanent
self.dispatcher = Downloader.dispatcher
def receive = {
case DownloadFile(uri, file) =>
// do the download
}
}
trait CyclicLoadBalancing extends LoadBalancer { this: Actor =>
val downloaders: List[ActorRef]
val seq = new CyclicIterator[ActorRef](downloaders)
}
trait DownloadManager extends Actor {
self.lifeCycle = Permanent
self.faultHandler = OneForOneStrategy(List(classOf[Exception]), 5, 5000)
val downloaders: List[ActorRef]
override def preStart = downloaders foreach { self.startLink(_) }
override def postStop = self.shutdownLinkedActors()
}
class DownloadService extends DownloadManager with CyclicLoadBalancing {
val downloaders = List.fill(3)(Actor.actorOf[Downloader])
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With