I wrote a simple (I thought...) rate limiter to keep an event driven system under our licensed API hit limits. For some reason it seizes up sometimes after 400-500 requests are sent through.
My best idea is that I have screwed up the wait function so in some circumstances it never returns, but I have been unable to locate the flawed logic. Another idea is that I have botched the Async/Task interop causing issues. It always works initially and then later. A single instance of ApiRateLimiter
is being shared across several components in order to honor hit limits system wide.
type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>
type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =
let requestLimit = Math.Max(limitCount,1)
let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox ->
let rec waitUntilUnderLimit (recentRequestsTimeSent: seq<DateTimeOffset>) = async{
let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
let requestsWithinLimit =
recentRequestsTimeSent
|> Seq.filter(fun x -> x >= cutoffTime)
|> Seq.toList
if requestsWithinLimit.Length >= requestLimit then
let! _ = Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
return! waitUntilUnderLimit requestsWithinLimit
else
return requestsWithinLimit
}
let rec messageLoop (mostRecentRequestsTimeSent: seq<DateTimeOffset>) = async{
// read a message
let! keyedRequest,replyChannel = inbox.Receive()
// wait until we are under our rate limit
let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent
let rightNow = DateTimeOffset.UtcNow
let! response =
keyedRequest.Request
|> httpClient.SendAsync
|> Async.AwaitTask
replyChannel.Reply { Key = keyedRequest.Key; Response = response }
return! messageLoop (seq {
yield rightNow
yield! remainingRecentRequests
})
}
// start the loop
messageLoop (Seq.empty<DateTimeOffset>)
)
member this.QueueApiRequest keyedRequest =
async {
return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
} |> Async.StartAsTask
Some of the requests are large and take a little bit of time, but nothing that could cause the total death of request sending that I am seeing with this thing.
Thanks for taking a second to look!
I notice that you're building up a list of the most recent times a request was sent by using a seq:
seq {
yield rightNow
yield! remainingRecentRequests
}
Because F# sequences are lazy, this produces an enumerator that, when asked for its next value, will first yield one value, and then will start iterating through its child seq and yielding a value. Every time you yield a new request, you add a new enumerator -- but when are the old ones disposed? You'd think they would be disposed once they expire, that is, once the Seq.filter
in waitUntilUnderLimit
returns false. But think about it: how would the F# compiler know that the filter condition will always be false once it has been false once? Without a deep code analysis (which the compiler doesn't do), it can't. So the "old" seqs can never be garbage collected, because they're still being kept around in case they're ever needed. I'm not 100% certain of this because I haven't measured the memory usage of your code, but if you were to measure memory use of your ApiRateLimiter
instance I bet you'd see it growing steadily without ever going down.
I also noticed that you are adding the new items on the front of the seq. This is exactly the same semantics that an F# list would use, but with a list, there are no IEnumerable objects to allocate, and once a list item fails the List.filter
condition, it will be disposed of. I therefore rewrote your code to use a list of recent times instead of a seq, and I made one other change for efficiency: since the way you create your list guarantees that it will be sorted, with the most recent events first and the oldest last, I replaced List.filter
with List.takeWhile
. That way the minute the first date is older than the cutoff, it will stop checking older dates.
With this change, you should now have old dates actually expiring, and the memory usage of your ApiRateLimiter
class should fluctuate but remain constant-ish. (It will be creating new lists every time waitUntilUnderLimit
is called, so it will create some GC pressure, but those should all be in generation 0). I don't know if this will solve your hanging problem or not, but this is the only problem I could see in your code.
BTW, I also replaced your line let! _ = Async.Sleep 100
with do! Async.Sleep 100
, which is simpler. No efficiency gains here, but there's no need to use let! _ =
to wait for an Async<unit>
to return; that's precisely what the do!
keyword is for.
type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>
type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =
let requestLimit = Math.Max(limitCount,1)
let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox ->
let rec waitUntilUnderLimit (recentRequestsTimeSent: DateTimeOffset list) = async{
let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
let requestsWithinLimit =
recentRequestsTimeSent
|> List.takeWhile (fun x -> x >= cutoffTime)
if List.length requestsWithinLimit >= requestLimit then
do! Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
return! waitUntilUnderLimit requestsWithinLimit
else
return requestsWithinLimit
}
let rec messageLoop (mostRecentRequestsTimeSent: DateTimeOffset list) = async{
// read a message
let! keyedRequest,replyChannel = inbox.Receive()
// wait until we are under our rate limit
let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent
let rightNow = DateTimeOffset.UtcNow
let! response =
keyedRequest.Request
|> httpClient.SendAsync
|> Async.AwaitTask
replyChannel.Reply { Key = keyedRequest.Key; Response = response }
return! messageLoop (rightNow :: remainingRecentRequests)
}
// start the loop
messageLoop []
)
member this.QueueApiRequest keyedRequest =
async {
return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
} |> Async.StartAsTask
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With