Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hot Concat in Rx

Observable.Concat is an implementation that joins observables but the second IObservable<T> only makes the subscription when the first is completed.

http://www.introtorx.com/content/v1.0.10621.0/12_CombiningSequences.html#Concat

Is there any implementation of a "HotConcat"? Similar to Observable.Merge, but keeping the delivery order, first pushing the elements of initial subscription and then the subsequents. Something like: Hot Concat

I know that is possible to use the ReplaySubject<T>, but it doesn't seems so good, because of performance and memory usage impacts..

like image 272
J. Lennon Avatar asked Jul 16 '14 20:07

J. Lennon


1 Answers

Here's the implementation I've been using for a while. This implementation introduces a BufferUntilSubscribed operator that turns an IObservable into a IConnectableObservable that will start buffering whenever you call Connect and will deliver the buffered results to the first subscriber. Once the first subscriber has "caught up", then the buffering will stop and the subscriber will just be given the live events as they arrive.

Once you have that, you can write HotConcat as something like:

public static IObservable<T> HotConcat<T>(params IObservable<T>[] sources)
{
    var s2 = sources.Select(s => s.BufferUntilSubscribed());
    var subscriptions = new CompositeDisposable(s2.Select(s2 => s2.Connect()).ToArray());
    return Observable.Create<T>(observer =>
    {
        var s = new SingleAssignmentDisposable();
        var d = new CompositeDisposable(subscriptions);
        d.Add(s);

        s.Disposable = s2.Concat().Subscribe(observer);

        return d;
    });
}

Here's the implemementation of BufferUntilSubscribed:

private class BufferUntilSubscribedObservable<T> : IConnectableObservable<T>
{
    private readonly IObservable<T> _source;
    private readonly IScheduler _scheduler;
    private readonly Subject<T> _liveEvents;
    private bool _observationsStarted;
    private Queue<T> _buffer;
    private readonly object _gate;

    public BufferUntilSubscribedObservable(IObservable<T> source, IScheduler scheduler)
    {
        _source = source;
        _scheduler = scheduler;
        _liveEvents = new Subject<T>();
        _buffer = new Queue<T>();
        _gate = new object();
        _observationsStarted = false;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_gate)
        {
            if (_observationsStarted)
            {
                return _liveEvents.Subscribe(observer);
            }

            _observationsStarted = true;

            var bufferedEvents = GetBuffers().Concat().Finally(RemoveBuffer); // Finally clause to remove the buffer if the first observer stops listening.
            return Observable.Merge(_liveEvents, bufferedEvents).Subscribe(observer);
        }
    }

    public IDisposable Connect()
    {
        return _source.Subscribe(OnNext, _liveEvents.OnError, _liveEvents.OnCompleted);
    }

    private void RemoveBuffer()
    {
        lock (_gate)
        {
            _buffer = null;
        }
    }

    /// <summary>
    /// Acquires a lock and checks the buffer.  If it is empty, then replaces it with null and returns null.  Else replaces it with an empty buffer and returns the old buffer.
    /// </summary>
    /// <returns></returns>
    private Queue<T> GetAndReplaceBuffer()
    {
        lock (_gate)
        {
            if (_buffer == null)
            {
                return null;
            }

            if (_buffer.Count == 0)
            {
                _buffer = null;
                return null;
            }

            var result = _buffer;
            _buffer = new Queue<T>();
            return result;
        }
    }

    /// <summary>
    /// An enumerable of buffers that will complete when a call to GetAndReplaceBuffer() returns a null, e.g. when the observer has caught up with the incoming source data.
    /// </summary>
    /// <returns></returns>
    private IEnumerable<IObservable<T>> GetBuffers()
    {
        Queue<T> buffer;
        while ((buffer = GetAndReplaceBuffer()) != null)
        {
            yield return buffer.ToObservable(_scheduler);
        }
    }

    private void OnNext(T item)
    {
        lock (_gate)
        {
            if (_buffer != null)
            {
                _buffer.Enqueue(item);
                return;
            }
        }

        _liveEvents.OnNext(item);
    }
}

/// <summary>
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data.
/// Thus the observer may subscribe late to a hot observable yet still see all of the data.  Later observers will not see the buffered events.
/// </summary>
/// <param name="source"></param>
/// <param name="scheduler">Scheduler to use to dump the buffered data to the observer.</param>
/// <returns></returns>
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source, IScheduler scheduler)
{
    return new BufferUntilSubscribedObservable<T>(source, scheduler);
}

/// <summary>
/// Returns a connectable observable, that once connected, will start buffering data until the observer subscribes, at which time it will send all buffered data to the observer and then start sending new data.
/// Thus the observer may subscribe late to a hot observable yet still see all of the data.  Later observers will not see the buffered events.
/// </summary>
/// <param name="source"></param>
/// <returns></returns>
public static IConnectableObservable<T> BufferUntilSubscribed<T>(this IObservable<T> source)
{
    return new BufferUntilSubscribedObservable<T>(source, Scheduler.Immediate);
}
like image 125
Brandon Avatar answered Oct 01 '22 08:10

Brandon