According to the documentation of groupBy
:
Note: A
GroupedObservable
will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore thoseGroupedObservable
s that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator liketake(int)(0)
to them.
There's a RxJava tutorial which says:
Internally, every Rx operator does 3 things
- It subscribes to the source and observes the values.
- It transforms the observed sequence according to the operator's purpose.
- It pushes the modified sequence to its own subscribers, by calling onNext, onError and onCompleted.
Let's take a look at the following code block which extracts only even numbers from range(0, 10)
:
Observable.range(0, 10)
.groupBy(i -> i % 2)
.filter(g -> g.getKey() % 2 == 0)
.flatMap(g -> g)
.subscribe(System.out::println, Throwable::printStackTrace);
My questions are:
Does it mean filter
operator already implies a subscription to every group resulted from groupBy
or just the Observable<GroupedObservable>
one?
Will there be a memory leak in this case? If so,
How to properly discard those groups? Replace filter
with a custom one, which does a take(0)
followed by a return Observable.empty()
? You may ask why I don't just return take(0)
directly: it's because filter
doesn't neccessarily follow right after groupBy
, but can be anywhere in the chain and involve more complex conditions.
A memory leak starts when a program requests a chunk of memory from the operating system for itself and its data. As a program operates, it sometimes needs more memory and makes an additional request.
To avoid memory leaks, memory allocated on heap should always be freed when no longer needed.
Memory profilers are tools that can monitor memory usage and help detect memory leaks in an application. Profilers can also help with analyzing how resources are allocated within an application, for example how much memory and CPU time is being used by each method.
The best approach to checking for the existence of a memory leak in your application is by looking at your RAM usage and investigating the total amount of memory been used versus the total amount available. Evidently, it is advisable to obtain snapshots of your memory's heap dump while in a production environment.
Apart from the memory leak, the current implementation may end up hanging completely due to internal request coordination problems.
Note that using take(0)
, the group may be recreated all the time. I'd instead use ignoreElements
which drops values, no items reach flatMap
and the group itself won't be recreated all the time.
Your suspicions are correct in that to properly handle the grouped observable each of the inner observables (g
) must be subscribed to. As filter
is subscribing to the outer observable only it's a bad idea. Just do what you need in the flatMap
using ignoreElements
to filter out undesired groups.
Observable.range(0, 10)
.groupBy(i -> i % 2)
.flatMap(g -> {
if (g.getKey() % 2 == 0)
return g;
else
return g.ignoreElements();
})
.subscribe(System.out::println, Throwable::printStackTrace);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With