Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

OutOfMemoryException when sending high volume POSTS with retrofit2 and rx java2

I have an app with a local db (room) and a service that POSTs all the "events" from the database using retrofit 2 and rxjava. When I send a high volume of POSTs (ie 1500+), the app throws an OutOfMemoryException. I presume this happens because it starts a new thread every time the client sends a new POST. Is there a way I could prevent the retrofit/ rxJava creating so many threads? Or is it better to wait for the server to respond? Here is my code:

Class that retrieves all the events from the local db

public class RetreiveDbContent {

private final EventDatabase eventDatabase;

public RetreiveDbContent(EventDatabase eventDatabase) {
    this.eventDatabase = eventDatabase;
}

@Override
public Maybe<List<Event>> eventsList() {

 return eventDatabase.eventDao().getAllEvents()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
}

next, I have a service that iterates trough the list of db events and posts all of them. If the backend sends back success, that event is deleted from the local db.

    private void sendDbContent() {

    mRetreiveDbContent.eventsList()
            .subscribe(new MaybeObserver<List<Event>>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(final List<Event> events) {


            Timber.e("Size of list from db " + events.size());
            final CompositeDisposable disposable = new CompositeDisposable();

            Observable<Event> eventObservable = Observable.fromIterable(events);
            eventObservable.subscribe(new Observer<Event>() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposable.add(d);
                }

                @Override
                public void onNext(Event event) {
                    Timber.d("sending event from db " + event.getAction());
                    mPresenter.postEvent(Event);
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e("error while emitting db content " + e.getMessage());
                }

                @Override
                public void onComplete() {
                    Timber.d("Finished looping through db list");
                    disposable.dispose();
                }
            });

        }

        @Override
        public void onError(Throwable e) {
            Timber.e("Error occurred while attempting to get db content " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Timber.d("Finished getting the db content");
        }
    });
}

this is my postEvent() & deleteEvent() methods that lives in a presenter

    public void postEvent(final Event event) {

    mSendtEvent.sendEvent(event)
          .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DisposableObserver<Response<ResponseBody>>() {
                @Override
                public void onNext(Response<ResponseBody> responseBodyResponse) {

                    switch (responseBodyResponse.code()) {
                        case CREATED_RESPONSE:
                            Timber.d("Event posted successfully " + responseBodyResponse.code());
                            deleteEventFromRoom(event);
                            break;
                        case BAD_REQUEST:
                            Timber.e("Client sent a bad request! We need to discard it!");
                            break;
                    }
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e("Error " + e.getMessage());
                    mView.onErrorOccurred();
                }

                @Override
                public void onComplete() {

                }
            });
}


    public void deleteEventFromRoom(final Event event) {

    final CompositeDisposable disposable = new CompositeDisposable();
    mRemoveEvent.removeEvent(event)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposable.add(d);
                }

                @Override
                public void onNext(Object o) {
                    Timber.d("Successfully deleted event from database " + event.getAction());
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    disposable.dispose();
                }
            });
}

and finally mRemoveEvent interactor

public class RemoveEvent {

private final EventDatabase eventDatabase;

public RemoveEvent(EventDatabase eventDatabase) {
    this.eventDatabase = eventDatabase;
}

@Override
public Observable removeEvent(final Event event) {
    return Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            return eventDatabase.eventDao().delete(event);
        }
    });
}
}

Note: I'm a newbie in the RXJava world. Thank you in advance

like image 687
android enthusiast Avatar asked Dec 14 '25 18:12

android enthusiast


1 Answers

You are using Observable which does not support backpressure.

Fom RxJava github page:

Backpressure

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, a so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated for the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they; there is always room to store one item temporarily.

You should use Flowable, You are sending all the events to downstream to get processed with all the resources available.

Here is a simple example:

Flowable.range(1, 1000)
        .buffer(10)//Optional you can process single event
        .flatMap(buf -> {
            System.out.println(String.format("100ms for sending events to server: %s ", buf));
            Thread.sleep(100);
            return Flowable.fromIterable(buf);
        }, 1)// <-- How many concurrent task should be executed
        .map(x -> x + 1)
        .doOnNext(i -> System.out.println(String.format("doOnNext: %d", i)))
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.single(), false, 1)//Overrides the 128 default buffer size
        .subscribe(new DefaultSubscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer t) {
        System.out.println(String.format("Received response from server for event : %d", t));
        System.out.println("Processing value would take some time");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //You can request for more data here
        request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("ExampleUnitTest.onComplete");
    }
});

And the last tip: You should not fetch the whole events into memory at once, basically you are holding all the "Database Event" in memory, consider paging or something like Cursor, fetch 100 rows per operation and after processing them request for the next 100, and I'm hoping you're doing this with JobScheduler or WorkManager API

like image 112
M. Reza Nasirloo Avatar answered Dec 16 '25 08:12

M. Reza Nasirloo



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!