I'm stuck with rx and a specific query. The problem:
Many single update operations are produced by continuous stream. The operations can be insert or delete. I want to buffer those streams and perform few operations at the time, but it is really important to preserve the order. Additionally, operations should be buffered and done in sequences every X seconds
Example:
In:
insert-insert-insert-delete-delete-insert-delete-delete-delete-delete
Out:
insert(3)-delete(2)-insert(1)-delete(4)
I wrote a simple application to test it, and it works more or less as I would but it doesn't respect the order of incoming insert/delete
namespace RxTests
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
internal class Program
{
private static readonly Random Random = new Random();
private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource();
private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>();
private static void Main(string[] args)
{
Console.WriteLine("Starting production");
var producerScheduler = new EventLoopScheduler();
var consumerScheduler = new EventLoopScheduler();
var producer =
Observable.Interval(TimeSpan.FromSeconds(2))
.SubscribeOn(producerScheduler)
.Subscribe(Produce, WriteProductionCompleted);
var consumer =
operations.ObserveOn(producerScheduler)
.GroupBy(operation => operation.Delete)
.SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50))
.SubscribeOn(consumerScheduler)
.Subscribe(WriteUpdateOperations);
Console.WriteLine("Type any key to stop");
Console.ReadKey();
consumer.Dispose();
producer.Dispose();
}
private static void Produce(long time)
{
var delete = Random.NextDouble() < 0.5;
Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time);
var idString = (time + 1).ToString(CultureInfo.InvariantCulture);
var id = time + 1;
operations.OnNext(
new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture)));
}
private static void WriteProductionCompleted()
{
Console.WriteLine("Production completed");
ProducerStopped.Cancel();
}
private static void WriteUpdateOperation(UpdateOperation updateOperation)
{
Console.WriteLine("Consuming {0}", updateOperation);
}
private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation)
{
foreach (var operation in updateOperation)
{
WriteUpdateOperation(operation);
}
}
private class UpdateOperation
{
public UpdateOperation(long id, bool delete, params string[] changes)
{
this.Id = id;
this.Delete = delete;
this.Changes = new List<string>(changes ?? Enumerable.Empty<string>());
}
public bool Delete { get; set; }
public long Id { get; private set; }
public IList<string> Changes { get; private set; }
public override string ToString()
{
var stringBuilder = new StringBuilder("{UpdateOperation ");
stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete);
if (this.Changes.Count > 0)
{
stringBuilder.Append(this.Changes.First());
foreach (var change in this.Changes.Skip(1))
{
stringBuilder.AppendFormat(", {0}", change);
}
}
stringBuilder.Append("]}");
return stringBuilder.ToString();
}
}
}
}
can anybody help me with the right query?
Thanks
UPDATE 08.03.13 (Suggestions by JerKimball)
The following lines are small changes/additions to JerKimball's code to print the results:
using(query.Subscribe(Print))
{
Console.ReadLine();
producer.Dispose();
}
Using the following Print methods:
private static void Print(IObservable<IList<Operation>> operations)
{
operations.Subscribe(Print);
}
private static void Print(IList<Operation> operations)
{
var stringBuilder = new StringBuilder("[");
if (operations.Count > 0)
{
stringBuilder.Append(operations.First());
foreach (var item in operations.Skip(1))
{
stringBuilder.AppendFormat(", {0}", item);
}
}
stringBuilder.Append("]");
Console.WriteLine(stringBuilder);
}
and the following to string for the Operation:
public override string ToString()
{
return string.Format("{0}:{1}", this.Type, this.Seq);
}
Order is preserved, but:
Methinks you can get what you're after with a mix of GroupByUntil
, DistinctUntilChanged
, and Buffer
:
This requires a bit of tweaking to fit your example code, but the query (and concept) should hold:
(edit: doh - missed a bit there...)
void Main()
{
var rnd = new Random();
var fakeSource = new Subject<Operation>();
var producer = Observable
.Interval(TimeSpan.FromMilliseconds(1000))
.Subscribe(i =>
{
var op = new Operation();
op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete";
fakeSource.OnNext(op);
});
var singleSource = fakeSource.Publish().RefCount();
var query = singleSource
// We want to groupby until we see a change in the source
.GroupByUntil(
i => i.Type,
grp => singleSource.DistinctUntilChanged(op => op.Type))
// then buffer up those observed events in the groupby window
.Select(grp => grp.Buffer(TimeSpan.FromSeconds(8), 50));
using(query.Subscribe(Console.WriteLine))
{
Console.ReadLine();
producer.Dispose();
}
}
public class Operation {
private static int _cnt = 0;
public Operation() { Seq = _cnt++; }
public int Seq {get; set;}
public string Type {get; set;}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With