I am trying to understand the details of RxJava
.
Intuitively I expected first()
and take(1)
to be equal and do the same. However by digging in the source code first()
is defined as take(1).single()
.
What is the single()
here good for? Doesn't take(1)
already guarantee to output a single item?
The first() attribute returns a row type object while take() returns a list type. But As soon as we converts our RDD to DataFrame using .
The RxJS first() operator is generally used when you are only interested in the first item emitted by the source observable or the first item that meets some criteria. In this case, you can use this operator to filter the Observable.
take returns an Observable that emits only the first count values emitted by the source Observable. If the source emits fewer than count values then all of its values are emitted. After that, it completes, regardless if the source completes.
The of Operator is a creation Operator. Creation Operators are functions that create an Observable stream from a source. The of Operator will create an Observable that emits a variable amount of values in sequence, followed by a Completion notification.
The difference is that take(1)
will relay 0..1 items from upstream whereas first
will relay the very first element or emits an error (NoSuchElementException) if the upstream is empty. Neither of them is blocking.
It is true first == take(1).single()
where take(1)
limits the number of upstream items to 1 and single()
makes sure upstream isn't empty.
This example prints "Done" only
Observable.empty().take(1) .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
This example prints "1" followed by "Done":
Observable.just(1).take(1) .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
This example also prints "1" followed by "Done":
Observable.just(1, 2, 3).take(1) .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
This example fails with NoSuchElementException
Observable.empty().first() .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
This example, again, prints "1" followed by "Done":
Observable.just(1).first() .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
This example, again, prints "1" followed by "Done":
Observable.just(1, 2, 3).first() .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
This example prints a stacktrace of NoSuchElementException
because the source contained too few elements:
Observable.empty().single() .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
This example prints a stacktrace of IllegalArgumentException
because the source contained too many elements:
Observable.just(1, 2, 3).single() .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Done"));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With