I would like to prevent an emission from occurring if and only if the same exact item was emitted within the last x milliseconds. I've looked at the throttle and debounce operators but I'm not sure if they can help me here. Is there another operator I can use, or can I compose them somehow?
You can do this with groupByUntil to essentially debounce individual items
o
.groupByUntil(x => x, x => x, x => Observable.timer(1000))
.flatMap(grp => grp.first())
As your question does not completely explains the scenario like compare next emitted value to last emitted value or any last emitted values or something else. I would take a general way to reach to the solution.
Example is in RxJava.
You can use timestamp()
with filter()
operator like following:
ArrayList<String> list = new ArrayList<>();
final long[] timeOfSubscribe = {-1};
final long timeDuration = 2 * 1000; // 2 seconds
Observable.fromIterable(list)
.timestamp()
.filter(item -> item.time() > (timeDuration + timeOfSubscribe[0]) && item.value().equals("your last value"))
.doOnSubscribe(__ -> timeOfSubscribe[0] = Calendar.getInstance().getTimeInMillis())
.subscribe();
I guess this snippet can help you just need to change your emitted value comparison login that is in filter()
operator. If you are looking for last emitted value you can stop last emitted value using doOnNext()
operator (to have simple case) or if your looking for all the last emitted values you need to store emitted values in list and check.
I hope it helps.
You could timestamp and pair off each item, and then check your boundary conditions of time and equality.
randomSource
.timestamp()
.pairwise()
.where(pair => pair[0].timestamp - pair[1].timestamp < limit && pair[0].value === pair[1].value);
Then apply a .select(pair => pair[0].value)
to get back your original item.
Working example in C#, with a source that generates random items between 1 and 5 spaced over random times:
static IObservable<T[]> Pairwise<T>(this IObservable<T> source)
{
source = source.Publish().RefCount();
return source.Skip(1).Zip(source, (a, b) => new[] { a, b });
}
static void Main(string[] args)
{
var randomSource =
Observable.Defer(() => Observable.Timer(TimeSpan.FromSeconds(new Random().NextDouble() * 2))).Repeat().Publish().RefCount().Select(_ => new Random().Next(1, 5));
var limit = TimeSpan.FromSeconds(1);
var sameDebounce =
randomSource
.Timestamp()
.Pairwise()
.Where(pair => pair[0].Timestamp - pair[1].Timestamp < limit && pair[0].Value == pair[1].Value);
sameDebounce.Subscribe(c => Console.WriteLine("{0} {1}", c[0], c[1]));
Console.ReadLine();
}
Output:
2@9/7/2017 5:00:04 AM +00:00 2@9/7/2017 5:00:04 AM +00:00
2@9/7/2017 5:00:09 AM +00:00 2@9/7/2017 5:00:08 AM +00:00
1@9/7/2017 5:00:23 AM +00:00 1@9/7/2017 5:00:23 AM +00:00
2@9/7/2017 5:00:33 AM +00:00 2@9/7/2017 5:00:32 AM +00:00
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With