I am using RxJava and I have an Observable
with multiple items inside. What I would like to do is run function A on the first item, function B on all of them and function C when the Observable
is completed:
-----1-----2-----3-----|-->
| | | |
run A | | |
| | | |
run B run B run B |
|
run C
is there a clever way of expressing this with lambda functions? I have the following solution already, but it looks ugly and I suspect that there is a better way to do this:
observable.subscribe(
new Action1<Item>() {
boolean first = true;
@Override
public void call(Item item) {
if (first) {
runA(item);
first = false;
}
runB(fax1);
}
},
throwable -> {},
() -> runC());
Use Observable.defer
to encapsulate per subscription state (being a boolean that indicates if we are on the first record).
Here's a runnable class that demos use:
import rx.Observable;
import rx.Observable.Transformer;
import rx.functions.Action1;
public class DoOnFirstMain {
public static void main(String[] args) {
Observable<Integer> o =
Observable.just(1, 2, 3)
.compose(doOnFirst(System.out::println);
// will print 1
o.subscribe();
// will print 1
o.subscribe();
}
public static <T> Transformer<T, T> doOnFirst(Action1<? super T> action) {
return o -> Observable.defer(() -> {
final AtomicBoolean first = new AtomicBoolean(true);
return o.doOnNext(t -> {
if (first.compareAndSet(true, false)) {
action.call(t);
}
});
});
}
}
Even though OP was asking about RxJava1, here's the same solution above but for RxJava2:
import java.util.concurrent.atomic.AtomicBoolean;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.functions.Consumer;
public class DoOnFirstMain {
public static void main(String[] args) {
Flowable<Integer> f =
Flowable.just(1, 2, 3)
.compose(doOnFirst(System.out::println);
// will print 1
f.subscribe();
// will print 1
f.subscribe();
}
public static <T> FlowableTransformer<T, T> doOnFirst(Consumer<? super T> consumer) {
return f -> Flowable.defer(() -> {
final AtomicBoolean first = new AtomicBoolean(true);
return f.doOnNext(t -> {
if (first.compareAndSet(true, false)) {
consumer.accept(t);
}
});
});
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With