Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly handle onError on my EventBus using RX Java

I am using rx java with android and my event bus class is as below

public class EventBus {
private final Subject<Event, Event> subject = new SerializedSubject<>(PublishSubject.<Event>create());
private Observable<Map<Type, Event>> stickyObservable;

public EventBus() {
    createStickyObservable();
}

private void createStickyObservable() {
    final List<Observable<Event>> observables = new ArrayList<>();

    final Observable<Map<Type, Event>> so = subject
            .filter(event -> event.sticky)
            .groupBy(event -> event.type)
            .switchMap(groupedObservable -> {
                BehaviorSubject<Event> bs = BehaviorSubject.create();
                groupedObservable.subscribe(bs);
                observables.add(bs);
                return Observable.combineLatest(observables, args -> {
                    Map<Type, Event> map = new HashMap<>();
                    for (Object arg : args) {
                        Event event = (Event) arg;
                        map.put(event.type, event);
                    }

                    return map;
                });
            });

    final BehaviorSubject<Map<Type, Event>> bs = BehaviorSubject.create();
    so.subscribe(bs);
    stickyObservable = bs;
}

public Observable<Event> filter(final String pathExpression) {
    final Pattern pattern = Pattern.compile(pathExpression);

    return subject.filter(event -> {
        if (event.path == null) {
            return pathExpression == null;
        }
        return pattern.matcher(event.path).matches();
    });
}

public Observable<Map<Type, Event>> getStickyObservable() {
    return stickyObservable;
}


public void event(Event event) {
    subject.onNext(event);
}

}

I am getting a lot of error logs with rx.exceptions.OnErrorNotImplementedException:

How can i fix this ? Please suggest me some solution to this problem

like image 772
Renjith Avatar asked Dec 06 '25 06:12

Renjith


1 Answers

When you subscribe to getStickyObservable() you need to implement the onError method (don't just use for example the .subscribe(action) overload).

like image 53
Dave Moten Avatar answered Dec 08 '25 20:12

Dave Moten



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!