Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

C# async within an action

Tags:

c#

action

task

I would like to write a method which accept several parameters, including an action and a retry amount and invoke it.

So I have this code:

public static IEnumerable<Task> RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
    {
        object lockObj = new object();
        int index = 0;

        return new Action(async () =>
        {
            while (true)
            {
                T item;
                lock (lockObj)
                {
                    if (index < source.Count)
                    {
                        item = source[index];
                        index++;
                    }
                    else
                        break;
                }

                int retry = retries;
                while (retry > 0)
                {
                    try
                    {
                        bool res = await action(item);
                        if (res)
                            retry = -1;
                        else
                            //sleep if not success..
                            Thread.Sleep(200);

                    }
                    catch (Exception e)
                    {
                        LoggerAgent.LogException(e, method);
                    }
                    finally
                    {
                        retry--;
                    }
                }
            }
        }).RunParallel(threads);
    }

RunParallel is an extention method for Action, its look like this:

public static IEnumerable<Task> RunParallel(this Action action, int amount)
    {
        List<Task> tasks = new List<Task>();
        for (int i = 0; i < amount; i++)
        {
            Task task = Task.Factory.StartNew(action);
            tasks.Add(task);
        }
        return tasks;
    }

Now, the issue: The thread is just disappearing or collapsing without waiting for the action to finish.

I wrote this example code:

private static async Task ex()
    {
        List<int> ints = new List<int>();
        for (int i = 0; i < 1000; i++)
        {
            ints.Add(i);
        }

        var tasks = RetryComponent.RunWithRetries(ints, 100, async (num) =>
        {
            try
            {
                List<string> test = await fetchSmthFromDb();
                Console.WriteLine("#" + num + "  " + test[0]);
                return test[0] == "test";
            }
            catch (Exception e)
            {
                Console.WriteLine(e.StackTrace);
                return false;
            }

        }, 5, "test");

        await Task.WhenAll(tasks);
    }

The fetchSmthFromDb is a simple Task> which fetches something from the db and works perfectly fine when invoked outside of this example.

Whenever the List<string> test = await fetchSmthFromDb(); row is invoked, the thread seems to be closing and the Console.WriteLine("#" + num + " " + test[0]); not even being triggered, also when debugging the breakpoint never hit.

The Final Working Code

private static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
    {
        while (true)
        {
            try
            {
                await action();
                break;
            }
            catch (Exception e)
            {
                LoggerAgent.LogException(e, method);
            }

            if (retryCount <= 0)
                break;

            retryCount--;
            await Task.Delay(200);
        };
    }

    public static async Task RunWithRetries<T>(List<T> source, int threads, Func<T, Task<bool>> action, int retries, string method)
    {
        Func<T, Task> newAction = async (item) =>
        {
            await DoWithRetries(async ()=>
            {
                await action(item);
            }, retries, method);
        };
        await source.ParallelForEachAsync(newAction, threads);
    }
like image 585
Ori Refael Avatar asked Aug 28 '16 13:08

Ori Refael


People also ask

What C is used for?

C programming language is a machine-independent programming language that is mainly used to create many types of applications and operating systems such as Windows, and other complicated programs such as the Oracle database, Git, Python interpreter, and games and is considered a programming foundation in the process of ...

What is C in C language?

What is C? C is a general-purpose programming language created by Dennis Ritchie at the Bell Laboratories in 1972. It is a very popular language, despite being old. C is strongly associated with UNIX, as it was developed to write the UNIX operating system.

What is the full name of C?

In the real sense it has no meaning or full form. It was developed by Dennis Ritchie and Ken Thompson at AT&T bell Lab. First, they used to call it as B language then later they made some improvement into it and renamed it as C and its superscript as C++ which was invented by Dr.

Is C language easy?

Compared to other languages—like Java, PHP, or C#—C is a relatively simple language to learn for anyone just starting to learn computer programming because of its limited number of keywords.


1 Answers

The problem is in this line:

return new Action(async () => ...

You start an async operation with the async lambda, but don't return a task to await on. I.e. it runs on worker threads, but you'll never find out when it's done. And your program terminates before the async operation is complete -that's why you don't see any output.

It needs to be:

return new Func<Task>(async () => ...

UPDATE

First, you need to split responsibilities of methods, so you don't mix retry policy (which should not be hardcoded to a check of a boolean result) with running tasks in parallel.

Then, as previously mentioned, you run your while (true) loop 100 times instead of doing things in parallel.

As @MachineLearning pointed out, use Task.Delay instead of Thread.Sleep.

Overall, your solution looks like this:

using System.Collections.Async;

static async Task DoWithRetries(Func<Task> action, int retryCount, string method)
{
    while (true)
    {
        try
        {
            await action();
            break;
        }
        catch (Exception e)
        {
            LoggerAgent.LogException(e, method);
        }

        if (retryCount <= 0)
            break;

        retryCount--;
        await Task.Delay(millisecondsDelay: 200);
    };
}

static async Task Example()
{
    List<int> ints = new List<int>();
    for (int i = 0; i < 1000; i++)
        ints.Add(i);

    Func<int, Task> actionOnItem =
        async item =>
        {
            await DoWithRetries(async () =>
            {
                List<string> test = await fetchSmthFromDb();
                Console.WriteLine("#" + item + "  " + test[0]);
                if (test[0] != "test")
                    throw new InvalidOperationException("unexpected result"); // will be re-tried
            },
            retryCount: 5,
            method: "test");
        };

    await ints.ParallelForEachAsync(actionOnItem, maxDegreeOfParalellism: 100);
}

You need to use the AsyncEnumerator NuGet Package in order to use the ParallelForEachAsync extension method from the System.Collections.Async namespace.

like image 170
Serge Semenov Avatar answered Oct 22 '22 10:10

Serge Semenov