Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Consumer interface of Reactivex

I'm new to ReactiveX. I was learning it from reading source-code. Everything was so clear but suddenly I got this word named "Consumer" which was an Interface. It was used in place of Observer.

Can someone let me know what it exactly does?

I followed several links but they just all said just one statement Consumer is a functional interface (callback) that accepts a single value.

I want to know the exact working of it.

  1. What is it?
  2. Why do we need it?
  3. How do you use it?
  4. Does it take the place of Observer? If YES, how and why?
like image 865
Abhishek Kumar Avatar asked Sep 08 '17 12:09

Abhishek Kumar


People also ask

What is consumer in RxJava?

A functional interface (callback) that accepts a single value.

What is ReactiveX Android?

ReactiveX, also known as Reactive Extensions or RX, is a library for composing asynchronous and event-based programs by using observable sequences. This is perfect for Android, which is an event-driven and user-focused platform.

What is ReactiveX RxJava?

RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.

What is OnNext in RxJava?

OnNext. conveys an item that is emitted by the Observable to the observer. OnCompleted. indicates that the Observable has completed successfully and that it will be emitting no further items.


2 Answers

Consumer is a simple Java interface that accepts variable of type T. Like you said it is used for callbacks.

Example:

import io.reactivex.functions.Consumer;

Flowable.just("Hello world").subscribe(new Consumer<String>() {
      @Override public void accept(String s) {
          System.out.println(s);
      }
  });

Why does it work? How can we use a Consumer instead of an Observer?

RxJava simply creates an Observer, passes the Consumer to it an it gets called in onNext

Update

  • You call Observable.subscribe(Consumer onNext)

  • Observable.subscribe(Consumer onNext, Consumer onError, Action onComplete, Consumer onSubscribe) gets called

  • LambdaObserver is created

LambdaObserver is a kind of observer that is created out of four functional interfaces and uses them as callbacks. It's mostly for using java 8 lambda expressions. It looks like this:

Observable.just(new Object())
                .subscribe(
                        o -> processOnNext(o),
                        throwable -> processError(throwable),
                        () -> processCompletion(),
                        disposable -> processSubscription()
                );
like image 113
Amaksoft Avatar answered Oct 02 '22 01:10

Amaksoft


A Consumer consumes the values you receive when subscribing. It's like a Subscriber who passes the emitted data as callback.

The Consumer is a simple Interface which has a callback for a generic Type and is needed to receive the emitted items by the Observable.

Take care that if you only have a Consumer that you don't catch errors and you may get problems by debugging.

You can solve that by using another Consumer as second parameter which receives a Throwable.

Flowable.just("Hello world")
  .subscribe(
            emittedData -> System.out.println(emittedData), // onNext
            throwable -> throwable.printStackTrace() // onError
);
like image 35
Emanuel S Avatar answered Oct 02 '22 02:10

Emanuel S