Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kotlin getting a subscriber to observe an observable using RxJava2

Android Studio 3.0 Beta2

I have created 2 methods one that creates the observable and another that creates the subscriber.

However, I am having a issue try to get the subscriber to subscribe to the observable. In Java this would work, and I am trying to get it to work in Kotlin.

In my onCreate(..) method I am trying to set this. Is this the correct way to do this?

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        /* CANNOT SET SUBSCRIBER TO SUBCRIBE TO THE OBSERVABLE */
        createStringObservable().subscribe(createStringSubscriber())
    }


    fun createStringObservable(): Observable<String> {
        val myObservable: Observable<String> = Observable.create {
            subscriber ->
            subscriber.onNext("Hello, World!")
            subscriber.onComplete()
        }

        return myObservable
    }

    fun createStringSubscriber(): Subscriber<String> {
        val mySubscriber = object: Subscriber<String> {
            override fun onNext(s: String) {
                println(s)
            }

            override fun onComplete() {
                println("onComplete")
            }

            override fun onError(e: Throwable) {
                println("onError")
            }

            override fun onSubscribe(s: Subscription?) {
                println("onSubscribe")
            }
        }

        return mySubscriber
    }
}

Many thanks for any suggestions,

like image 371
ant2009 Avatar asked Aug 18 '17 17:08

ant2009


People also ask

What is an observable in RxJava?

What is an Observable? In RxJava, Observable s are the source that emits data to the Observers. We can understand observable s as suppliers — they process and supply data to other components. It does some work and emits some values. The following are the different types of Observable s in RxJava

What is RxJava and how does it work?

We can understand RxJava as data emitted by one component, called Observable, and the underlying structure provided by the Rx libraries will propagate changes to another component, Observer. Simply put, it’s an API for asynchronous programming with observable streams.

What is RxJava – multi-threading in Android?

RxJava — Multi-Threading in Android helps to understand the basics of Rx, everything about Observable s, Observer s, Scheduler s, etc. So, hoping that you already know about basics of RxJava lets start by discussing Observable. What is an Observable? In RxJava, Observable s are the source that emits data to the Observers.

What is a subscriber in observable?

A subscriber is an argument of an Observable constructor and this is responsible for sending values / error back to observer or notifying for completion. A subscriber can next, error or completion method. (new Observable((subscriber) => { subscriber.next('hello world'); subscriber.error(new Error('Something is wrong')); subscriber.complete(); }));


3 Answers

pay close attention to the types.

Observable.subscribe() has three basic variants:

  • one that accepts no arguments
  • several that accept an io.reactivex.functions.Consumer
  • one that accepts an io.reactivex.Observer

the type you're attempting to subscribe with in your example is org.reactivestreams.Subscriber (defined as part of the Reactive Streams Specification). you can refer to the docs to get a fuller accounting of this type, but suffice to say it's not compatible with any of the overloaded Observable.subscribe() methods.

here's a modified example of your createStringSubscriber() method that will allow your code to compile:

fun createStringSubscriber(): Observer<String> {
        val mySubscriber = object: Observer<String> {
            override fun onNext(s: String) {
                println(s)
            }

            override fun onComplete() {
                println("onComplete")
            }

            override fun onError(e: Throwable) {
                println("onError")
            }

            override fun onSubscribe(s: Disposable) {
                println("onSubscribe")
            }
        }

        return mySubscriber
    }

the things changed are:

  1. this returns an Observer type (instead of Subscriber)
  2. onSubscribe() is passed a Disposable (instead of Subscription)

.. and as mentioned by 'Vincent Mimoun-Prat', lambda syntax can really shorten your code.

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // Here's an example using pure RxJava 2 (ie not using RxKotlin)
        Observable.create<String> { emitter ->
            emitter.onNext("Hello, World!")
            emitter.onComplete()
        }
                .subscribe(
                        { s -> println(s) },
                        { e -> println(e) },
                        {      println("onComplete") }
                )

        // ...and here's an example using RxKotlin. The named arguments help
        // to give your code a little more clarity
        Observable.create<String> { emitter ->
            emitter.onNext("Hello, World!")
            emitter.onComplete()
        }
                .subscribeBy(
                        onNext     = { s -> println(s) },
                        onError    = { e -> println(e) },
                        onComplete = {      println("onComplete") }
                )
    }

i hope that helps!

like image 73
homerman Avatar answered Sep 27 '22 21:09

homerman


Have a look at RxKotlin, that will simplify a lot of things and make code more concise.

val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

list.toObservable() // extension function for Iterables
        .filter { it.length >= 5 }
        .subscribeBy(  // named arguments for lambda Subscribers
                onNext = { println(it) },
                onError =  { it.printStackTrace() },
                onComplete = { println("Done!") }
        )
like image 23
Vincent Mimoun-Prat Avatar answered Sep 27 '22 23:09

Vincent Mimoun-Prat


val observer = object: Observer<Int> {
    override fun onNext(t: Int) {
        // Perform the value of `t`
    }
    override fun onComplete() {
        // Perform something on complete
    }
    override fun onSubscribe(d: Disposable) {
        // Disposable provided
    }
    override fun onError(e: Throwable) {
        // Handling error
    }
}
like image 25
Sana Ebadi Avatar answered Sep 27 '22 21:09

Sana Ebadi