I want to implement timer using Kotlin coroutines, something similar to this implemented with RxJava:
Flowable.interval(0, 5, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.map { LocalDateTime.now() }
.distinctUntilChanged { old, new ->
old.minute == new.minute
}
.subscribe {
setDateTime(it)
}
It will emit LocalDateTime every new minute.
Standard library: All the support for coroutines is written in Kotlin's core libraries. A coroutine internally uses a Continuation class to capture the contexts for its execution. Then the dynamic aspect is modeled as a Job class. The use of async usually creates a Deferred job, which is a subclass of the Job class.
Kotlin coroutines provide an API that enables you to write asynchronous code. With Kotlin coroutines, you can define a CoroutineScope , which helps you to manage when your coroutines should run. Each asynchronous operation runs within a particular scope.
Edit: note that the API suggested in the original answer is now marked @ObsoleteCoroutineApi
:
Ticker channels are not currently integrated with structured concurrency and their api will change in the future.
You can now use the Flow
API to create your own ticker flow:
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun tickerFlow(period: Duration, initialDelay: Duration = Duration.ZERO) = flow {
delay(initialDelay)
while (true) {
emit(Unit)
delay(period)
}
}
And you can use it in a way very similar to your current code:
tickerFlow(5.seconds)
.map { LocalDateTime.now() }
.distinctUntilChanged { old, new ->
old.minute == new.minute
}
.onEach {
setDateTime(it)
}
.launchIn(viewModelScope) // or lifecycleScope or other
Note: with the code as written here, the time taken to process elements is not taken into account by tickerFlow
, so the delay might not be regular (it's a delay between element processing). If you want the ticker to tick independently of the processing of each element, you may want to use a buffer or a dedicated thread (e.g. via flowOn
).
I believe it is still experimental, but you may use a TickerChannel to produce values every X millis:
val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)
repeat(10) {
tickerChannel.receive()
val currentTime = LocalDateTime.now()
println(currentTime)
}
If you need to carry on doing your work while your "subscribe" does something for each "tick", you may launch
a background coroutine that will read from this channel and do the thing you want:
val tickerChannel = ticker(delayMillis = 60_000, initialDelayMillis = 0)
launch {
for (event in tickerChannel) {
// the 'event' variable is of type Unit, so we don't really care about it
val currentTime = LocalDateTime.now()
println(currentTime)
}
}
delay(1000)
// when you're done with the ticker and don't want more events
tickerChannel.cancel()
If you want to stop from inside the loop, you can simply break out of it, and then cancel the channel:
val ticker = ticker(500, 0)
var count = 0
for (event in ticker) {
count++
if (count == 4) {
break
} else {
println(count)
}
}
ticker.cancel()
A very pragmatic approach with Kotlin Flows could be:
// Create the timer flow
val timer = (0..Int.MAX_VALUE)
.asSequence()
.asFlow()
.onEach { delay(1_000) } // specify delay
// Consume it
timer.collect {
println("bling: ${it}")
}
another possible solution as a reusable kotlin extension of CoroutineScope
fun CoroutineScope.launchPeriodicAsync(
repeatMillis: Long,
action: () -> Unit
) = this.async {
if (repeatMillis > 0) {
while (isActive) {
action()
delay(repeatMillis)
}
} else {
action()
}
}
and then usage as:
var job = CoroutineScope(Dispatchers.IO).launchPeriodicAsync(100) {
//...
}
and then to interrupt it:
job.cancel()
another note: we consider here that action
is non-blocking and does not take time.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With