Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I sequentially loop an observable in RxSwift?

I am trying to create a stream that polls a network service. At the moment it queries the service then completes after a short delay. I'd like the onward stream to restart rather than completing thereby polling the service forever.

You could do something like ...

myPollingStream.repeat()

But repeat in RxSwift is actually repeatElement and so actually generates a stream of observables. You could possibly concatMap these into a flattened serial sequence but RxSwift does not have the concatMap operator.

So how do I loop an observable in RxSwift?

I'd like the requests to be sequential, not concurrent so flatMap is not an option since it merges streams leading to overlapping requests. I'm looking for something similar to how retry() works but restarting onComplete not onError

like image 529
Brendan Avatar asked Feb 08 '17 14:02

Brendan


People also ask

What is onNext in RxSwift?

parameter onNext: Action to invoke for each element in the observable sequence. - returns: Subscription object used to unsubscribe from the observable sequence.

What is subscribe in RxSwift?

The Subscribe is how you connect an observer to an Observable. onNext This method is called whenever the Observable emits an item. This method takes as a parameter the item emitted by the Observable. When a value is added to an observable it will send the next event to its subscribers.

What is Completable RxSwift?

A Completable represent a Observable that can only complete or emit an error. It's equivalent to Observable<Void> that can't emit elements. Emits zero elements. Emits a completion event, or an error. Doesn't share side effects.

What is observer in RxSwift?

Observables: Observables is basically a wrapper around some data source and data source typically means a stream of values, since the main purpose of using RxSwift to do async programming easily, data coming from different sources over time will be put in this stream and will send to the observers.


1 Answers

Observable.repeatElement(myPollingStream, scheduler: MainScheduler.instance).concat()
  • repeatElement(_:scheduler:) will create an infinite stream of polling queries.
  • contat() will then make sure each polling query is completed before subscribing to the next.

Attention

While the above works in theory, without a backpressure implemetation, repeatElements(_:scheduler:) will emit events until you eventually run out of memory. This makes this solution not viable as of RxSwift 3.0. More details can be found in this issue on RxSwift repository.

like image 103
tomahh Avatar answered Sep 28 '22 06:09

tomahh