Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Rx Java 2: How to wrap a callback?

I have this code to wrap a callback in Rx Java 1 and it compiles fine , but now that I have switched to RX Java 2 it does not compile...what is the equivalent in Rx Java 2?

return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
            @Override
            public void call(AsyncEmitter<Integer> emitter) {

                transObs.setTransferListener(new TransferListener() {
                    @Override
                    public void onStateChanged(int id, TransferState state) {
                        if (state == TransferState.COMPLETED)
                            emitter.onCompleted();
                    }

                    @Override
                    public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {

                    }

                    @Override
                    public void onError(int id, Exception ex) {
                        emitter.onError(ex);
                    }
                });

                emitter.setCancellation(new AsyncEmitter.Cancellable() {
                    @Override
                    public void cancel() throws Exception {

                        transObs.cleanTransferListener();
                    }
                });
            }
        }, AsyncEmitter.BackpressureMode.BUFFER);

UPDATE:

I came up with this, but Do you have to deal with backpressure since its an oncreate call?

 return Observable.create(new ObservableOnSubscribe<List<DigitsUser>>() {

        @Override
        public void subscribe(final ObservableEmitter<List<DigitsUser>> emitter) throws Exception {

            mDigitFriends.findFriends((gotEm, users) -> {
                emitter.onNext(users);
            });

            emitter.setCancellable(() -> {
                emitter.onNext(null);
            });
        }
    });
like image 1000
Mike6679 Avatar asked Apr 05 '17 13:04

Mike6679


1 Answers

If you're worried about backpressure you should use the Flowable class. Here's a quote from the RxJava2 Wiki:

Practically, the 1.x fromEmitter (formerly fromAsync) has been renamed to Flowable.create.

Here is your example using the Flowable class:

 return Flowable.create(new FlowableEmitter<List<DigitsUser>>() {

        @Override
        public void subscribe(final FlowableEmitter<List<DigitsUser>> emitter) throws Exception {

            mDigitFriends.findFriends((gotEm, users) -> {
                emitter.onNext(users);
            });

            emitter.setCancellable(() -> {
                emitter.onNext(null);
            });
        }
    }, BackpressureStrategy.BUFFER);
like image 140
ajplumlee33 Avatar answered Nov 13 '22 21:11

ajplumlee33