Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How Async streams compares to reactive extension?

How to compare the following two? Is Rx more powerful?

Reactive extension:

var observable = Observable.Create<char>(async (observer, cancel) =>
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        observer.OnNext(line);
    }
});

observable.Subscribe(
    c => Console.WriteLine(c.ToString()),
    () => end.Dispose());

Async streams:

public async void Run(string path)
{
    await foreach (var line in TestAsync())
    {
        Console.WriteLine(line);
    }
}

private async IAsyncEnumerable<string> TestAsync()
{
    while (true)
    {
        string line = await sr.ReadLineAsync();
        if (line == null)
            break;
        yield return line;
    }
}
like image 485
ca9163d9 Avatar asked Oct 20 '19 09:10

ca9163d9


People also ask

Is reactive programming asynchronous?

Reactive programming describes a design paradigm that relies on asynchronous programming logic to handle real-time updates to otherwise static content. It provides an efficient means -- the use of automated data streams -- to handle data updates to content whenever a user makes an inquiry.

What is the purpose of reactive extension?

ReactiveX (also known as Reactive Extensions) is a software library originally created by Microsoft that allows imperative programming languages to operate on sequences of data regardless of whether the data is synchronous or asynchronous.

What is stream in reactive programming?

Reactive programming A stream is a sequence of ongoing events (state changes) ordered in time. Streams can emit three different things: a value (of some type), an error, or a "completed" signal.

When should you use reactive programming?

One of the most common use-case for using RP is when you need handling of asynchronous data streams. Yet, this is a tricky question to answer because even a minuscule difference in the use-case might become a deciding factor in our choice.


1 Answers

The two features work together. PS: Forget about async streams, think about await foreach.

Async streams

Async streams are a (relatively) low level feature that allows asynchronous iteration. By itself, it doesn't offer any other capabilities like filtering, aggregation etc. It's pull based while Rx is push based.

You can use LINQ operators on an async stream through the System.Linq.Async library found in ..... the ReacticeX.NET Github repo. It's fast, but doesn't offer the event processing functionality of Rx.

There's no notion of time time for example, much less a way to use a custom scheduler. There are no subscriptions, no error events. GroupBy will consume the entire source and emit group items as separate IAsyncEnumerable instances, while Rx's GroupBy will emit separate Observables for each group.

In the question's example, IAsyncEnumerable is a natural fit since there's no event logic involved, just iterating over an asynchronous iterator.

If the example tried to poll eg a remote service and detect failure spikes (ie more failures per interval than a threshold) IAsyncEnumerable would be inappropriate as it would block waiting for all responses. In fact, we could't aggregate events per time at all.

Threading

None really - an IAsyncEnumerable or await foreach call don't specify how events are produced or consumed. If we want to use a separate task to process an item, we have to create it ourselves, eg :

public async Task Run(string path)
{
    await foreach (var line in LoadStockTrades())
    {
        var result = await Task.Run(()=>AnalyzeTrade(line));
        Console.WriteLine($"{result} : {line});
    }
}

Reactive Extensions

Reactive Extensions is a high level library that deals with event streams. It's push based, it understands time, but it's also slower than lower-level constructs like Async Streams or Channels.

In the question's example, Rx would be overkill. Polling and detecting spikes though is easy, with multiple windowing options.

System.Linq.Async can create an Observable from an IAsyncEnumerable with ToObservable, which means an IAsyncEnumerable can be used as a source for Rx.

Threading

By default, Rx is single threaded, which makes perfect sense for its main scenario - event stream processing.

On the other hand, Rx allows the publisher, subscriber and operators to run on the same or separate threads. In languages that don't have async/await or DataFlow (eg Java,JavaScript), Rx is used to emulate concurrent processing pipelines by running the publisher and subscribers on different threads.

like image 96
Panagiotis Kanavos Avatar answered Sep 29 '22 21:09

Panagiotis Kanavos