Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unwrapping IObservable<Task<T>> into IObservable<T> with order preservation

Is there a way to unwrap the IObservable<Task<T>> into IObservable<T> keeping the same order of events, like this?

Tasks:  ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->

Let's say I have a desktop application that consumes a stream of messages, some of which require heavy post-processing:

IObservable<Message> streamOfMessages = ...;

IObservable<Task<Result>> streamOfTasks = streamOfMessages
    .Select(async msg => await PostprocessAsync(msg));

IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks

I imagine two ways of dealing with that.

First, I can subscribe to streamOfTasks using the asynchronous event handler:

streamOfTasks.Subscribe(async task =>
{
    var result = await task;
    Display(result);
});

Second, I can convert streamOfTasks using Observable.Create, like this:

var streamOfResults =
    from task in streamOfTasks
    from value in Observable.Create<T>(async (obs, cancel) =>
    {
        var v = await task;
        obs.OnNext(v);

        // TODO: don't know when to call obs.OnComplete()
    })
    select value;

streamOfResults.Subscribe(result => Display(result));

Either way, the order of messages is not preserved: some later messages that don't need any post-processing come out faster than earlier messages that require post-processing. Both my solutions handle the incoming messages in parallel, but I'd like them to be processed sequentially, one by one.

I can write a simple task queue to process just one task at a time, but perhaps it's an overkill. Seems to me that I'm missing something obvious.


UPD. I wrote a sample console program to demonstrate my approaches. All solutions by far don't preserve the original order of events. Here is the output of the program:

Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0

Here is the complete source code:

// "C:\Program Files (x86)\MSBuild\14.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll

using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        Console.WriteLine("Press ENTER to exit.");

        // the source stream
        var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
        timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));

        // solution #1: using async event handler
        timerEvents.Subscribe(async x =>
        {
            var result = await PostprocessAsync(x);
            Console.WriteLine($"Async handler: {x}");
        });

        // solution #2: using Observable.Create
        var processedEventsV2 =
            from task in timerEvents.Select(async x => await PostprocessAsync(x))
            from value in Observable.Create<long>(async (obs, cancel) =>
            {
                var v = await task;
                obs.OnNext(v);
            })
            select value;
        processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));

        // solution #3: using FromAsync, as answered by @Enigmativity
        var processedEventsV3 =
            from msg in timerEvents
            from result in Observable.FromAsync(() => PostprocessAsync(msg))
            select result;

        processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));

        Console.ReadLine();
    }

    static async Task<long> PostprocessAsync(long x)
    {
        // some messages require long post-processing
        if (x % 3 == 0)
        {
            await Task.Delay(TimeSpan.FromSeconds(2.5));
        }

        // and some don't
        return x;
    }
}
like image 761
yallie Avatar asked Apr 10 '17 02:04

yallie


3 Answers

Combining @Enigmativity's simple approach with @VMAtm's idea of attaching the counter and some code snippets from this SO question, I came up with this solution:

// usage
var processedStream = timerEvents.SelectAsync(async t => await PostprocessAsync(t));

processedStream.Subscribe(x => Console.WriteLine($"Processed: {x}"));

// my sample console program prints the events ordered properly:
Timer: 0
Timer: 1
Timer: 2
Processed: 0
Processed: 1
Processed: 2
Timer: 3
Timer: 4
Timer: 5
Processed: 3
Processed: 4
Processed: 5
....

Here is my SelectAsync extension method to transform IObservable<Task<TSource>> into IObservable<TResult> keeping the original order of events:

public static IObservable<TResult> SelectAsync<TSource, TResult>(
    this IObservable<TSource> src,
    Func<TSource, Task<TResult>> selectorAsync)
{
    // using local variable for counter is easier than src.Scan(...)
    var counter = 0;
    var streamOfTasks =
        from source in src
        from result in Observable.FromAsync(async () => new
        {
            Index = Interlocked.Increment(ref counter) - 1,
            Result = await selectorAsync(source)
        })
        select result;

    // buffer the results coming out of order
    return Observable.Create<TResult>(observer =>
    {
        var index = 0;
        var buffer = new Dictionary<int, TResult>();

        return streamOfTasks.Subscribe(item =>
        {
            buffer.Add(item.Index, item.Result);

            TResult result;
            while (buffer.TryGetValue(index, out result))
            {
                buffer.Remove(index);
                observer.OnNext(result);
                index++;
            }
        });
    });
}

I'm not particularly satisfied with my solution as it looks too complex to me, but at least it doesn't require any external dependencies. I'm using here a simple Dictionary to buffer and reorder task results because the subscriber need not to be thread-safe (the subscriptions are neved called concurrently).

Any comments or suggestions are welcome. I'm still hoping to find the native RX way of doing this without custom buffering extension method.

like image 143
yallie Avatar answered Oct 18 '22 03:10

yallie


Is the following simple approach an answer for you?

IObservable<Result> streamOfResults =
    from msg in streamOfMessages
    from result in Observable.FromAsync(() => PostprocessAsync(msg))
    select result;
like image 1
Enigmativity Avatar answered Oct 18 '22 05:10

Enigmativity


To maintain the order of events you can funnel your stream into a TransformBlock from TPL Dataflow. The TransformBlock would execute your post-processing logic and will maintain the order of its output by default.

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using NUnit.Framework;

namespace HandlingStreamInOrder {

    [TestFixture]
    public class ItemHandlerTests {

        [Test]
        public async Task Items_Are_Output_In_The_Same_Order_As_They_Are_Input() {
            var itemHandler = new ItemHandler();
            var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(250));
            timerEvents.Subscribe(async x => {
                var data = (int)x;
                Console.WriteLine($"Value Produced: {x}");                
                var dataAccepted = await itemHandler.SendAsync((int)data);
                if (dataAccepted) {
                    InputItems.Add(data);
                }                
            });

            await Task.Delay(5000);
            itemHandler.Complete();
            await itemHandler.Completion;

            CollectionAssert.AreEqual(InputItems, itemHandler.OutputValues);
        }

        private IList<int> InputItems {
            get;
        } = new List<int>();
    }

    public class ItemHandler {


        public ItemHandler() {            
            var options = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = DataflowBlockOptions.Unbounded,
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                EnsureOrdered = true
            };
            PostProcessBlock = new TransformBlock<int, int>((Func<int, Task<int>>)PostProcess, options);

            var output = PostProcessBlock.AsObservable().Subscribe(x => {
                Console.WriteLine($"Value Output: {x}");
                OutputValues.Add(x);
            });
        }

        public async Task<bool> SendAsync(int data) {
            return await PostProcessBlock.SendAsync(data);
        }

        public void Complete() {
            PostProcessBlock.Complete();
        }

        public Task Completion {
            get { return PostProcessBlock.Completion; }
        }

        public IList<int> OutputValues {
            get;
        } = new List<int>();

        private IPropagatorBlock<int, int> PostProcessBlock {
            get;
        }

        private async Task<int> PostProcess(int data) {
            if (data % 3 == 0) {
                await Task.Delay(TimeSpan.FromSeconds(2));
            }            
            return data;
        }
    }
}
like image 1
JSteward Avatar answered Oct 18 '22 04:10

JSteward