Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Throttling concurrent async requests with loop

I am currently working on making a large amount of requests to a web API. I have tried to async this process so that I can do so in a reasonable amount of time, however I am unable to throttle the connections so that I don't send more than 10 requests/second. I am using a semaphore for the throttling but I am not entirely sure how it will work in this context as I have a nested loop.

I am essentially getting a list of models, and each model has a list of days inside it. I need to make a request for each day inside the models. The amount of days can be anywhere from 1 to about 50, 99% of the time it's only going to be 1. So I want to async each model because there will be about 3000 of them, but I want to async the days in the case that there are multiple days that need to be completed. I need to stay at or under 10 requests/second so I thought the best way to do so would be to set a request limit of 10 on the entire operation. Is there a place that I can put a semaphore that will limit connections to the entire chain?

Each individual request also has to make two requests for 2 different pieces of data and this API does not support any sort of batching right now.

I am sort of new to c#, very new to async and very new to WebRequests/HttpClient so any help is appreciated. I tried to add all relevant code here. If you need anything else let me know.

public static async Task GetWeatherDataAsync(List<Model> models)
{
    SemaphoreSlim semaphore = new SemaphoreSlim(10);
    var taskList = new List<Task<ComparisonModel>>();

    foreach (var x in models)
    {
        await semaphore.WaitAsync();
        taskList.Add(CompDaysAsync(x));
    }

    try
    {
        await Task.WhenAll(taskList.ToArray());
    }
    catch (Exception e) { }
    finally
    {
        semaphore.Release();
    }
}

public static async Task<Models> CompDaysAsync(Model model)
{
    var httpClient = new HttpClient();
    httpClient.DefaultRequestHeaders.Authorization = new 
                Headers.AuthenticationHeaderValue("Token","xxxxxxxx");
    httpClient.Timeout = TimeSpan.FromMinutes(5);
    var taskList = new List<Task<Models.DateTemp>>();

    foreach (var item in model.list)
    {
        taskList.Add(WeatherAPI.GetResponseForDayAsync(item, 
            httpClient, Latitude, Longitude));
    }
    httpClient.Dispose();
    try
    {
        await Task.WhenAll(taskList.ToArray());
    }
    catch (Exception e) { }

    return model;
}

public static async Task<DateTemp> GetResponseForDayAsync(DateTemp date, HttpClient httpClient, decimal? Latitude, decimal? Longitude)
{
    var response = await httpClient.GetStreamAsync(request1);
    StreamReader myStreamReader = new StreamReader(response);
    string responseData = myStreamReader.ReadToEnd();
    double[] data = new double[2];
    if (responseData != "[[null, null]]")
    {
        data = Array.ConvertAll(responseData.Replace("[", "").Replace("]", "").Split(','), double.Parse);
    }
    else { data = null; };

    double precipData = 0;
    var response2 = await httpClient.GetStreamAsync(request2);
    StreamReader myStreamReader2 = new StreamReader(response2);
    string responseData2 = myStreamReader2.ReadToEnd();
    if (responseData2 != null && responseData2 != "[null]" && responseData2 != "[0.0]")
    {
        precipData = double.Parse(responseData2.Replace("[", "").Replace("]", ""));
    }
    date.Precip = precipData;

    if (data != null)
    {
        date.minTemp = data[0];
        date.maxTemp = data[1];
    }
    return date;
}
like image 864
DevDevDev Avatar asked Jun 13 '17 16:06

DevDevDev


1 Answers

I think that you completely don't understand what does SemaphoreSlim do.

  1. Your semaphore is a method-level based local variable, so every GetWeatherDataAsync method call will spawn 10 calls to your API, without waiting for other client.
  2. Moreover, your code will deadlock, if models.Count > 10, because you waiting for semaphore in each iteration, those requests are being stacked, and for 11th your thread will hang forever, as you aren't releasing semaphore:

    var semaphore = new SemaphoreSlim(10);
    
    foreach (var item in Enumerable.Range(0, 15))
    {
        // will stop after 9
        await semaphore.WaitAsync();
        Console.WriteLine(item);
    }
    

What you really need to do is move the semaphore to instance-level (or even type-level with static keyword), and wait for it inside GetWeatherDataAsync, and put Release in finally block.

As for Parallel.Foreach - you should not use it in this scenario, as it's not aware of async methods (it was introduced before async/await), and your methods doesn't look like they are CPU-bound.

like image 99
VMAtm Avatar answered Sep 27 '22 21:09

VMAtm