Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

groupBy, filter and memory leak in Rx

According to the documentation of groupBy:

Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservables that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like take(int)(0) to them.

There's a RxJava tutorial which says:

Internally, every Rx operator does 3 things

  1. It subscribes to the source and observes the values.
  2. It transforms the observed sequence according to the operator's purpose.
  3. It pushes the modified sequence to its own subscribers, by calling onNext, onError and onCompleted.

Let's take a look at the following code block which extracts only even numbers from range(0, 10):

Observable.range(0, 10)
        .groupBy(i -> i % 2)
        .filter(g -> g.getKey() % 2 == 0)
        .flatMap(g -> g)
        .subscribe(System.out::println, Throwable::printStackTrace);

My questions are:

  1. Does it mean filter operator already implies a subscription to every group resulted from groupBy or just the Observable<GroupedObservable> one?

  2. Will there be a memory leak in this case? If so,

  3. How to properly discard those groups? Replace filter with a custom one, which does a take(0) followed by a return Observable.empty()? You may ask why I don't just return take(0) directly: it's because filter doesn't neccessarily follow right after groupBy, but can be anywhere in the chain and involve more complex conditions.

like image 538
FuzzY Avatar asked Nov 25 '15 21:11

FuzzY


People also ask

What is the main cause of memory leaks?

A memory leak starts when a program requests a chunk of memory from the operating system for itself and its data. As a program operates, it sometimes needs more memory and makes an additional request.

How do you avoid memory leaks?

To avoid memory leaks, memory allocated on heap should always be freed when no longer needed.

Which method is used to detect memory leak?

Memory profilers are tools that can monitor memory usage and help detect memory leaks in an application. Profilers can also help with analyzing how resources are allocated within an application, for example how much memory and CPU time is being used by each method.

How do I find a memory leak in production?

The best approach to checking for the existence of a memory leak in your application is by looking at your RAM usage and investigating the total amount of memory been used versus the total amount available. Evidently, it is advisable to obtain snapshots of your memory's heap dump while in a production environment.


2 Answers

Apart from the memory leak, the current implementation may end up hanging completely due to internal request coordination problems.

Note that using take(0), the group may be recreated all the time. I'd instead use ignoreElements which drops values, no items reach flatMap and the group itself won't be recreated all the time.

like image 157
akarnokd Avatar answered Oct 02 '22 15:10

akarnokd


Your suspicions are correct in that to properly handle the grouped observable each of the inner observables (g) must be subscribed to. As filter is subscribing to the outer observable only it's a bad idea. Just do what you need in the flatMap using ignoreElements to filter out undesired groups.

Observable.range(0, 10)
    .groupBy(i -> i % 2)
    .flatMap(g -> {
       if (g.getKey() % 2 == 0) 
         return g;
       else 
         return g.ignoreElements();
    })
    .subscribe(System.out::println, Throwable::printStackTrace);
like image 27
Dave Moten Avatar answered Oct 02 '22 17:10

Dave Moten