Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx groupby until condition changed

I'm stuck with rx and a specific query. The problem:

Many single update operations are produced by continuous stream. The operations can be insert or delete. I want to buffer those streams and perform few operations at the time, but it is really important to preserve the order. Additionally, operations should be buffered and done in sequences every X seconds

Example:

In:

insert-insert-insert-delete-delete-insert-delete-delete-delete-delete

Out:

insert(3)-delete(2)-insert(1)-delete(4)

I wrote a simple application to test it, and it works more or less as I would but it doesn't respect the order of incoming insert/delete

namespace RxTests
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;

internal class Program
{
    private static readonly Random Random = new Random();

    private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource();

    private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>();

    private static void Main(string[] args)
    {
        Console.WriteLine("Starting production");
        var producerScheduler = new EventLoopScheduler();
        var consumerScheduler = new EventLoopScheduler();
        var producer =
            Observable.Interval(TimeSpan.FromSeconds(2))
                      .SubscribeOn(producerScheduler)
                      .Subscribe(Produce, WriteProductionCompleted);
        var consumer =
            operations.ObserveOn(producerScheduler)
                      .GroupBy(operation => operation.Delete)
                      .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50))
                      .SubscribeOn(consumerScheduler)
                      .Subscribe(WriteUpdateOperations);
        Console.WriteLine("Type any key to stop");
        Console.ReadKey();
        consumer.Dispose();
        producer.Dispose();
    }

    private static void Produce(long time)
    {
        var delete = Random.NextDouble() < 0.5;
        Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time);
        var idString = (time + 1).ToString(CultureInfo.InvariantCulture);
        var id = time + 1;
        operations.OnNext(
            new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture)));
    }

    private static void WriteProductionCompleted()
    {
        Console.WriteLine("Production completed");
        ProducerStopped.Cancel();
    }

    private static void WriteUpdateOperation(UpdateOperation updateOperation)
    {
        Console.WriteLine("Consuming {0}", updateOperation);
    }

    private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation)
    {
        foreach (var operation in updateOperation)
        {
            WriteUpdateOperation(operation);
        }
    }

    private class UpdateOperation
    {
        public UpdateOperation(long id, bool delete, params string[] changes)
        {
            this.Id = id;
            this.Delete = delete;
            this.Changes = new List<string>(changes ?? Enumerable.Empty<string>());
        }

        public bool Delete { get; set; }

        public long Id { get; private set; }

        public IList<string> Changes { get; private set; }

        public override string ToString()
        {
            var stringBuilder = new StringBuilder("{UpdateOperation ");
            stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete);
            if (this.Changes.Count > 0)
            {
                stringBuilder.Append(this.Changes.First());
                foreach (var change in this.Changes.Skip(1))
                {
                    stringBuilder.AppendFormat(", {0}", change);
                }
            }

            stringBuilder.Append("]}");
            return stringBuilder.ToString();
        }
    }
}

}

can anybody help me with the right query?

Thanks

UPDATE 08.03.13 (Suggestions by JerKimball)

The following lines are small changes/additions to JerKimball's code to print the results:

using(query.Subscribe(Print))
{
    Console.ReadLine();
    producer.Dispose();        
}

Using the following Print methods:

private static void Print(IObservable<IList<Operation>> operations)
{
    operations.Subscribe(Print);
}

private static void Print(IList<Operation> operations)
{
    var stringBuilder = new StringBuilder("[");
    if (operations.Count > 0)
    {
        stringBuilder.Append(operations.First());
        foreach (var item in operations.Skip(1))
        {
            stringBuilder.AppendFormat(", {0}", item);
        }
    }

    stringBuilder.Append("]");
    Console.WriteLine(stringBuilder);
 }

and the following to string for the Operation:

public override string ToString()
{
    return string.Format("{0}:{1}", this.Type, this.Seq);
}

Order is preserved, but:

  • I'm not sure about subscribing within another subscription: is it correct (that's a question I have since long time ago and was never clear to me)?
  • I always have no more than two elements on each list (even if the stream produces more than two consecutive values with the same Type)
like image 964
fra Avatar asked Mar 07 '13 11:03

fra


1 Answers

Methinks you can get what you're after with a mix of GroupByUntil, DistinctUntilChanged, and Buffer:

This requires a bit of tweaking to fit your example code, but the query (and concept) should hold:

(edit: doh - missed a bit there...)

void Main()
{
    var rnd = new Random();
    var fakeSource = new Subject<Operation>();
    var producer = Observable
        .Interval(TimeSpan.FromMilliseconds(1000))
        .Subscribe(i => 
            {
                var op = new Operation();
                op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete";
                fakeSource.OnNext(op);
            });    
    var singleSource = fakeSource.Publish().RefCount();

    var query = singleSource
        // We want to groupby until we see a change in the source
        .GroupByUntil(
               i => i.Type, 
               grp => singleSource.DistinctUntilChanged(op => op.Type))
        // then buffer up those observed events in the groupby window
        .Select(grp => grp.Buffer(TimeSpan.FromSeconds(8), 50));

    using(query.Subscribe(Console.WriteLine))
    {
        Console.ReadLine();
        producer.Dispose();        
    }
}

public class Operation { 
    private static int _cnt = 0;
    public Operation() { Seq = _cnt++; }
    public int Seq {get; set;}
    public string Type {get; set;}    
}
like image 157
JerKimball Avatar answered Oct 01 '22 14:10

JerKimball