Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sort Observable by predefined order in Reactive Extensions

Say I have a type T:

class T {
    public int identifier; //Arbitrary but unique for each character (Guids in real-life)
    public char character; //In real life not a char, but I chose char here for easy demo purposes
}

And I have a predefined ordered sequence of identifiers:

int[] identifierSequence = new int[]{
    9, 3, 4, 4, 7
};

I now need to order an IObservable<T> which produces the following sequence of objects:

{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}

So that the resulting IObservable produces hello. I don't want to use ToArray, as I want to receive objects as soon as they arrive and not wait until everything is observed. More specifically, I would like to receive them like this:

 Input: e  h  l  l  o
Output:    he l  l  o

What would be the proper reactive way to do this? The best I could come up with is this:

Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;

inputObserable.SelectMany(item =>
{
    buffer[item.identifier] = item;

    IEnumerable<ReportTemplate> GetReadyElements()
    {
        while (true)
        {
            int nextItemIdentifier = identifierSequence[curIndex];
            T nextItem;
            if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
            {
                buffer.Remove(nextItem.identifier);
                curIndex++;
                yield return nextItem;
            }
            else
            {
                break;
            }
        }
    }

    return GetReadyElements();
});

EDIT:

Schlomo raised some very valid issues with my code, which is why I marked his answer as correct. I made some modifications to his to code for it to be usable:

  • Generic identifier and object type
  • Iteration instead of recursion to prevent potential stackoverflow on very large observables
  • Convert the anonymous type to a real class for readability
  • Wherever possible, lookup a value in a dictionary only once and store as variable instead of looking it up multiple times
  • Fixed type

This gives me the following code:

public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
    {
        var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
        return source.Scan(initialState, (oldState, item) =>
            {
                //Function to be called upon receiving new item
                //If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item
                //Otherwise, if nothing is available yet, just return the input state
                OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
                {
                    int index = state.Index;
                    ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
                    IList<T> output = new List<T>();

                    while (index < identifierSequence.Count)
                    {
                        TId key = identifierSequence[index];
                        ImmutableList<T> nextValues;
                        if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
                        {
                            //No values available yet
                            break;
                        }

                        T toOutput = nextValues[nextValues.Count - 1];
                        output.Add(toOutput);

                        buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
                        index++;
                    }

                    return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
                }

                //Before calling the recursive function, add the new item to the buffer
                TId itemIdentifier = identifierFunc(item);

                ImmutableList<T> valuesList;
                if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
                {
                    valuesList = ImmutableList<T>.Empty;
                }
                var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));

                return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
            })
            // Use Dematerialize/Notifications to detect and emit end of stream.
            .SelectMany(output =>
            {
                var notifications = output.Output
                    .Select(item => Notification.CreateOnNext(item))
                    .ToList();

                if (output.Index == identifierSequence.Count)
                {
                    notifications.Add(Notification.CreateOnCompleted<T>());
                }

                return notifications;
            })
            .Dematerialize();
    }

    class OrderByIdentifierSequenceState<T, TId>
    {
        //Index shows what T we're waiting on
        public int Index { get; }
        //Buffer holds T that have arrived that we aren't ready yet for
        public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
        //Output holds T that can be safely emitted.
        public IEnumerable<T> Output { get; }

        public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
        {
            this.Index = index;
            this.Buffer = buffer;
            this.Output = output;
        }
    }

However, this code still has a couple of problems:

  • Constant copying of the state (mainly the ImmutableDictionary), which can be very expensive. Possible solution: maintain a separate state per observer, instead of per item received.
  • When one or more of the elements in identifierSequence are not present in the source observable a problem appears. This currently blocks the ordered observable and it will never finish. Possible solutions: Timeout, throw exception when source observable is completed, return all available items when source observable is completed, ...
  • When the source observable contains more elements than identifierSequence, we get a memory leak. Items that are in the source observable, but not in identifierSequence currently get added to the dictionary, but will not be deleted before the source observable completes. This is a potential memory leak. Possible solutions: check whether the item is in identifierSequence before adding it to the dictionary, bypass code and immediately output the item, ...

MY SOLUTION:

    /// <summary>
    /// Takes the items from the source observable, and returns them in the order specified in identifierSequence.
    /// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed.
    /// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable.
    /// If there are items in the source observable that are not in identifierSequence, these items will be ignored.
    /// </summary>
    /// <typeparam name="T">The type that is produced by the source observable</typeparam>
    /// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam>
    /// <param name="source">The source observable</param>
    /// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param>
    /// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param>
    /// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns>
    public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }
        if (identifierSequence == null)
        {
            throw new ArgumentNullException(nameof(identifierSequence));
        }
        if (identifierFunc == null)
        {
            throw new ArgumentNullException(nameof(identifierFunc));
        }

        if (identifierSequence.Count == 0)
        {
            return Observable.Empty<T>();
        }

        HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);

        return Observable.Create<T>(observer =>
        {
            //current index of pending item in identifierSequence
            int index = 0;
            //buffer of items we have received but are not ready for yet
            Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();

            return source.Select(
                    item =>
                    {
                        //Function to be called upon receiving new item
                        //We search for the current pending item in the buffer. If it is available, we yield return it and repeat.
                        //If it is not available yet, stop.
                        IEnumerable<T> GetAvailableOutput()
                        {
                            while (index < identifierSequence.Count)
                            {
                                TId key = identifierSequence[index];
                                List<T> nextValues;
                                if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
                                {
                                    //No values available yet
                                    break;
                                }

                                yield return nextValues[nextValues.Count - 1];

                                nextValues.RemoveAt(nextValues.Count - 1);
                                index++;
                            }
                        }

                        //Get the identifier for this item
                        TId itemIdentifier = identifierFunc(item);

                        //If this item is not in identifiersInSequence, we ignore it.
                        if (!identifiersInSequence.Contains(itemIdentifier))
                        {
                            return Enumerable.Empty<T>();
                        }

                        //Add the new item to the buffer
                        List<T> valuesList;
                        if (!buffer.TryGetValue(itemIdentifier, out valuesList))
                        {
                            valuesList = new List<T>();
                            buffer[itemIdentifier] = valuesList;
                        }
                        valuesList.Add(item);

                        //Return all available items
                        return GetAvailableOutput();
                    })
                .Subscribe(output =>
                {
                    foreach (T cur in output)
                    {
                        observer.OnNext(cur);
                    }

                    if (index == identifierSequence.Count)
                    {
                        observer.OnCompleted();
                    }
                },(ex) =>
                {
                    observer.OnError(ex);
                }, () =>
                {
                    //When source observable is completed, return the remaining available items
                    while (index < identifierSequence.Count)
                    {
                        TId key = identifierSequence[index];
                        List<T> nextValues;
                        if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
                        {
                            //No values available
                            index++;
                            continue;
                        }

                        observer.OnNext(nextValues[nextValues.Count - 1]);

                        nextValues.RemoveAt(nextValues.Count - 1);
                        index++;
                    }

                    //Mark observable as completed
                    observer.OnCompleted();
                });
        });
    }
like image 257
Wouter Avatar asked Aug 02 '17 15:08

Wouter


2 Answers

Please note that your implementation has a few problems:

  1. If the two 'l's come before their time, one gets swallowed, then holding up the whole sequence. Your dictionary should map to a collection, not a single item.
  2. There's no OnCompleted message.
  3. Multiple subscribers can screw up the state. Try this (where GetPatternMatchOriginal is your code):

-

var stateMachine = src.GetPatternMatchOriginal(new int[] { 9, 3, 4, 4, 7 });

stateMachine.Take(3).Dump(); //Linqpad
stateMachine.Take(3).Dump(); //Linqpad

The first ouptut is h e l the second output is l o. They should both output h e l.

This implementation fixes those problems, and also is side-effect free using immutable data structures:

public static class X
{
    public static IObservable<T> GetStateMachine(this IObservable<T> source, string identifierSequence)
    {
        //State is held in an anonymous type: 
        //  Index shows what character we're waiting on, 
        //  Buffer holds characters that have arrived that we aren't ready yet for
        //  Output holds characters that can be safely emitted.
        return source
            .Scan(new { Index = 0, Buffer = ImmutableDictionary<int, ImmutableList<T>>.Empty, Output = Enumerable.Empty<T>() },
            (state, item) =>
            {
                //Function to be called recursively upon receiving new item
                //If we can pattern match the first item, then it is moved into Output, and concatted recursively with the next possible item
                //Otherwise just return the inputs
                (int Index, ImmutableDictionary<int, ImmutableList<T>> Buffer, IEnumerable<T> Output) GetOutput(int index, ImmutableDictionary<int, ImmutableList<T>> buffer, IEnumerable<T> results)
                {
                    if (index == identifierSequence.Length)
                        return (index, buffer, results);

                    var key = identifierSequence[index];
                    if (buffer.ContainsKey(key) && buffer[key].Any())
                    {
                        var toOuptut = buffer[key][buffer[key].Count - 1];
                        return GetOutput(index + 1, buffer.SetItem(key, buffer[key].RemoveAt(buffer[key].Count - 1)), results.Concat(new[] { toOuptut }));
                    }
                    else
                        return (index, buffer, results);
                }

                //Before calling the recursive function, add the new item to the buffer
                var modifiedBuffer = state.Buffer.ContainsKey(item.Identifier)
                   ? state.Buffer
                   : state.Buffer.Add(item.Identifier, ImmutableList<T>.Empty);

                var remodifiedBuffer = modifiedBuffer.SetItem(item.Identifier, modifiedBuffer[item.Identifier].Add(item));

                var output = GetOutput(state.Index, remodifiedBuffer, Enumerable.Empty<T>());
                return new { Index = output.Index, Buffer = output.Buffer, Output = output.Output };
            })
            // Use Dematerialize/Notifications to detect and emit end of stream.
            .SelectMany(output =>
            {
                var notifications = output.Output
                    .Select(item => Notification.CreateOnNext(item))
                    .ToList();
                if (output.Index == identifierSequence.Length)
                    notifications.Add(Notification.CreateOnCompleted<T>());
                return notifications;
            })
            .Dematerialize();
    }
}

then you can call it like so:

var stateMachine = src.GetStateMachine(new int[] { 9, 3, 4, 4, 7 });
stateMachine.Dump(); //LinqPad

src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 7, Character = 'o' });
src.OnNext(new T { Identifier = 3, Character = 'e' });
src.OnNext(new T { Identifier = 9, Character = 'h' });
like image 167
Shlomo Avatar answered Nov 14 '22 23:11

Shlomo


Given you have this:

IObservable<T> source = new []
{
    new T() { identifier = 3, character = 'e' },
    new T() { identifier = 9, character = 'h'},
    new T() { identifier = 4, character = 'l'},
    new T() { identifier = 4, character = 'l'},
    new T() { identifier = 7, character = 'o'}
}.ToObservable();

int[] identifierSequence = new int[]
{
    9, 3, 4, 4, 7
};

...then this works:

IObservable<T> query =
    source
        .Scan(new { index = 0, pendings = new List<T>(), outputs = new List<T>() }, (a, t) =>
        {
            var i = a.index;
            var o = new List<T>();
            a.pendings.Add(t);
            var r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault();
            while (r != null)
            {
                o.Add(r);
                a.pendings.Remove(r);
                i++;
                r = a.pendings.Where(x => x.identifier == identifierSequence[i]).FirstOrDefault();
            }
            return new { index = i, a.pendings, outputs = o };
        })
        .SelectMany(x => x.outputs);
like image 29
Enigmativity Avatar answered Nov 14 '22 22:11

Enigmativity