I want to poll an API endpoint until it reaches some condition. I expect it to reach this condition in couple of seconds to a minute. I have a method to call the endpoint that returns a Future
. Is there some way I can chain Future
s together to poll this endpoint every n
milliseconds and give up after t
tries?
Assume I have a function with the following signature:
def isComplete(): Future[Boolean] = ???
The simplest way to do this in my opinion would be to make everything blocking:
def untilComplete(): Unit = {
for { _ <- 0 to 10 } {
val status = Await.result(isComplete(), 1.seconds)
if (status) return Unit
Thread.sleep(100)
}
throw new Error("Max attempts")
}
But this may occupy all the threads and it is not asynchronous. I also considered doing it recursively:
def untilComplete(
f: Future[Boolean] = Future.successful(false),
attempts: Int = 10
): Future[Unit] = f flatMap { status =>
if (status) Future.successful(Unit)
else if (attempts == 0) throw new Error("Max attempts")
else {
Thread.sleep(100)
untilComplete(isComplete(), attempts - 1)
}
}
However, I am concerned about maxing out the call stack since this is not tail recursive.
Is there a better way of doing this?
Edit: I am using akka
Future. Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.
The Promise is a writable, single-assignment container that completes a Future. The Promise is similar to the Future. However, the Future is about the read-side of an asynchronous operation, while the Promise is about the write-side.
Await. result tries to return the Future result as soon as possible and throws an exception if the Future fails with an exception while Await. ready returns the completed Future from which the result (Success or Failure) can safely be extracted.
You could use Akka Streams. For example, to call isComplete
every 500 milliseconds until the result of the Future
is true, up to a maximum of five times:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._
def isComplete(): Future[Boolean] = ???
implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val stream: Future[Option[Boolean]] =
Source(1 to 5)
.throttle(1, 500 millis)
.mapAsync(parallelism = 1)(_ => isComplete())
.takeWhile(_ == false, true)
.runWith(Sink.lastOption)
stream onComplete { result =>
println(s"Stream completed with result: $result")
system.terminate()
}
I've given myself a library to do this. I have
trait Poller extends AutoCloseable {
def addTask[T]( task : Poller.Task[T] ) : Future[T]
def close() : Unit
}
where a Poller.Task
looks like
class Task[T]( val label : String, val period : Duration, val pollFor : () => Option[T], val timeout : Duration = Duration.Inf )
The Poller
polls every period
until the pollFor
method succeeds (yields a Some[T]
) or the timeout
is exceeded.
As a convenience, when I begin polling, I wrap this into a Poller.Task.withDeadline
:
final case class withDeadline[T] ( task : Task[T], deadline : Long ) {
def timedOut = deadline >= 0 && System.currentTimeMillis > deadline
}
which converts the (immutable, reusable) timeout
Duration of the task into a per-poll-attempt deadline for timing out.
To do the polling efficiently, I use Java's ScheduledExecutorService
:
def addTask[T]( task : Poller.Task[T] ) : Future[T] = {
val promise = Promise[T]()
scheduleTask( Poller.Task.withDeadline( task ), promise )
promise.future
}
private def scheduleTask[T]( twd : Poller.Task.withDeadline[T], promise : Promise[T] ) : Unit = {
if ( isClosed ) {
promise.failure( new Poller.ClosedException( this ) )
} else {
val task = twd.task
val deadline = twd.deadline
val runnable = new Runnable {
def run() : Unit = {
try {
if ( ! twd.timedOut ) {
task.pollFor() match {
case Some( value ) => promise.success( value )
case None => Abstract.this.scheduleTask( twd, promise )
}
} else {
promise.failure( new Poller.TimeoutException( task.label, deadline ) )
}
}
catch {
case NonFatal( unexpected ) => promise.failure( unexpected )
}
}
}
val millis = task.period.toMillis
ses.schedule( runnable, millis, TimeUnit.MILLISECONDS )
}
}
It seems to work well, without requiring sleeping or blocking of individual Threads
.
(Looking at the library, there's lots that could be done to make it clearer, easier to read, and the role of Poller.Task.withDeadline
would be clarified by making the raw constructor for that class private
. The deadline should always be computed from the task timeout
, should not be an arbitrary free variable.)
This code comes from here (framework and trait) and here (implementation). (If you want to use it outright maven coordinates are here.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With