Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using async / await with DataReader ? ( without middle buffers!)

My goal is simple , I want to do Asynchronous I/O calls (using async await) - but :

  • Without using DataFlow dependency ( like in this answer)
  • Without middle buffers( not like this answer)
  • The Projector function should be sent as an argument. ( not like this answer)

Ok.

Currently here is my code which it's job is to read from db and project each line to a Func<>

public IEnumerable < T > GetSomeData < T > (string sql, Func < IDataRecord, T > projector)
{
    using(SqlConnection _conn = new SqlConnection(@"Data Source=..."))
    {
        using(SqlCommand _cmd = new SqlCommand(sql, _conn))
        {
            _conn.Open();
            _cmd.CommandTimeout = 100000;
            using(IDataReader rdr = _cmd.ExecuteReader())
            {
                while (rdr.Read())  yield    return projector(rdr);
            }
        }
    }
}

So , what is projector ?

Each class has a function which gets a record ( IDataRecord) and create an entity :

Example :

public class MyClass
{
    public static MyClass MyClassFactory(IDataRecord record)
    {
        return new MyClass
        {
            Name = record["Name"].ToString(),
            Datee = DateTime.Parse(record["Datee"].ToString()),
            val = decimal.Parse(record["val"].ToString())
        };
    }
    public string Name    {   get;   set;  }
    public DateTime Datee    {  get;     set;  }
    public decimal val    {  get;    set;    }
}

So here , MyClassFactory would be the Func

So how I currently run it ?

 var sql = @"SELECT TOP 1000 [NAME],[datee] ,[val]  FROM [WebERP].[dbo].[t]";
 var a = GetSomeData < MyClass > (sql, MyClass.MyClassFactory).Where(...); //notice the Func

All ok.

The problems starts now :

Adding async to the method yields an error : ( Yes I know that Ienumerable is a Synchronous interface hence the problem)

public async Task<IEnumerable < T >> GetSomeData < T > (string sql, Func < IDataRecord, T > projector)

cannot be an iterator block because 'System.Threading.Tasks.Task>' is not an iterator interface type

But this guy here did - :

enter image description here

Which DOES compile.

Question

How can I convert my code to support fully asynchronse IO call ?

(under the conditions : without DataFlow dependency , send projector function as argument , no middle buffers)

like image 558
Royi Namir Avatar asked May 25 '14 09:05

Royi Namir


People also ask

What is “async/await”?

There’s a special syntax to work with promises in a more comfortable fashion, called “async/await”. It’s surprisingly easy to understand and use. Let’s start with the async keyword. It can be placed before a function, like this: The word “async” before a function means one simple thing: a function always returns a promise.

How can I do two things at once with await/async?

If you want to do two things at once using await/async, then you need to wrap each one in a function that returns a Task just like traditional TPL. That way you can keep hold of the tasks WITHOUT awaiting their results until you want to.

What should not be invoked when the dbdatareader returns a task?

Other methods and properties of the DbDataReader object should not be invoked while the returned Task is not yet completed. This method implements the asynchronous version of ExecuteNonQuery (), but returns a Task synchronously, blocking the calling thread.

What does the async keyword before a function do?

The async keyword before a function has two effects: Makes it always return a promise. Allows await to be used in it. The await keyword before a promise makes JavaScript wait until that promise settles, and then: If it’s an error, the exception is generated — same as if throw error were called at that very place. Otherwise, it returns the result.


2 Answers

I want to do Asynchronous I/O calls (using async await) - but :

  • Without using DataFlow dependency ( like in this answer)
  • Without middle buffers( not like this answer)
  • The Projector function should be sent as an argument. ( not like this answer)

You may want to check Stephen Toub's "Tasks, Monads, and LINQ" for some great ideas on how to process asynchronous data sequences.

It's not (yet) possible to combine yield and await, but I'm going to be a verbalist here: the quoted requirements didn't list IEnumerable and LINQ. So, here's a possible solution shaped as two coroutines (almost untested).

Data producer routine (corresponds to IEnumarable with yield):

public async Task GetSomeDataAsync<T>(
    string sql, Func<IDataRecord, T> projector, ProducerConsumerHub<T> hub)
{
    using (SqlConnection _conn = new SqlConnection(@"Data Source=..."))
    {
        using (SqlCommand _cmd = new SqlCommand(sql, _conn))
        {
            await _conn.OpenAsync();
            _cmd.CommandTimeout = 100000;
            using (var rdr = await _cmd.ExecuteReaderAsync())
            {
                while (await rdr.ReadAsync())
                    await hub.ProduceAsync(projector(rdr));
            }
        }
    }
}

Data consumer routine (correspond to foreach or a LINQ expression):

public async Task ConsumeSomeDataAsync(string sql)
{
    var hub = new ProducerConsumerHub<IDataRecord>();
    var producerTask = GetSomeDataAsync(sql, rdr => rdr, hub);

    while (true)
    {
        var nextItemTask = hub.ConsumeAsync();
        await Task.WhenAny(producerTask, nextItemTask);

        if (nextItemTask.IsCompleted)
        {
            // process the next data item
            Console.WriteLine(await nextItemTask);
        }

        if (producerTask.IsCompleted)
        {
            // process the end of sequence
            await producerTask;
            break;
        }
    }
}

Coroutine execution helper (can also be implemented as a pair of custom awaiters):

public class ProducerConsumerHub<T>
{
    TaskCompletionSource<Empty> _consumer = new TaskCompletionSource<Empty>();
    TaskCompletionSource<T> _producer = new TaskCompletionSource<T>();

    // TODO: make thread-safe
    public async Task ProduceAsync(T data)
    {
        _producer.SetResult(data);
        await _consumer.Task;
        _consumer = new TaskCompletionSource<Empty>();
    }

    public async Task<T> ConsumeAsync()
    {
        var data = await _producer.Task;
        _producer = new TaskCompletionSource<T>();
        _consumer.SetResult(Empty.Value);
        return data;
    }

    struct Empty { public static readonly Empty Value = default(Empty); }
}

This is just an idea. It might be an overkill for a simple task like this, and it could be improved in some areas (like thread-safety, race conditions and handling the end of the sequence without touching producerTask). Yet it illustrates how the asynchronous data retrieval and processing could possibly be decoupled.

like image 145
noseratio Avatar answered Oct 08 '22 13:10

noseratio


This Medium article describes another solution, which is to use the Dasync/AsyncEnumerable library.

The library is open source, available on NuGet and GitHub, and provides a readable syntax to use now, for IAsyncEnumerable, until C# 8.0 comes out and provides its own implementation and language support in the form of async ... yield return and await foreach.

(I have no connection with the library; I came across it as a possible very useful solution to - what I think is! - the same problem as yours, on a project I'm developing.)

like image 1
MikeBeaton Avatar answered Oct 08 '22 14:10

MikeBeaton