Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a ratelimit across mutiple coroutines efficently?

So let's say I have a bunch of couroutines running that interact with some webservice and since I don't want to spam it I wanna limit the requests to 1 request every x seconds max. For that I could use some code like this:

fun CoroutineScope.rateLimiter(tokens: SendChannel<Unit>, rate: Int) = launch {
    var lastToken = System.currentTimeMillis()
    while (isActive) {
        val currentTime = System.currentTimeMillis()
        if (currentTime - lastToken < rate) {
            delay(currentTime - lastToken)
        }
        tokens.send(Unit)
    }
}

fun CoroutineScope.request(tokens: ReceiveChannel<Unit>) = launch { 
    for (token in tokens) {
        //Do Web request
    }
}

1.) Is this way to do that efficient?

2.) This isn't expandable to say limit something to x bytes/second or something where I would need to request x tokens out of a Token Bucket, what would be the best way to implement something like that with coroutines?

like image 777
usbpc102 Avatar asked Nov 16 '22 23:11

usbpc102


1 Answers

If you wanna skip having dependency on jobs and channels that might create more permits than are being consumed, and then having a stampeeding herd once some process starts taking permits, maybe this is the solution for you.

(Some jvm-style in here, but replaceable for Multi-platform)


import kotlin.math.max
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

class SimpleRateLimiter(eventsPerSecond: Double) {

    private val mutex = Mutex()

    @Volatile
    private var next: Long = Long.MIN_VALUE
    private val delayNanos: Long = (1_000_000_000L / eventsPerSecond).toLong()

    /**
     * Suspend the current coroutine until it's calculated time of exit
     * from the rate limiter
     */
    suspend fun acquire() {
        val now: Long = System.nanoTime()
        val until = mutex.withLock {
            max(next, now).also {
                next = it + delayNanos
            }
        }
        if (until != now) {
            delay((until - now) / 1_000_000)
        }
    }
}

It ofc comes with other tradeoffs.

  • Behavior when nanoTime is nearing Long.MAX_VALUE is most definitely corrupted.
  • No impl for maxDelay/timeout
  • No way of grabbing multiple termits
  • No tryAquire implementation

If you want an IntervalLimiter that allows X requests every Y seconds, and then throws exceptions, there is the RateLimiter in Resilience4J Or if you want something that is a lot more fully featured, I'm working on a PR to create both a RateLimiter and an IntervalLimiter in coroutines core project.

like image 70
Jens Avatar answered Dec 07 '22 23:12

Jens