Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert Observable of List to List of Observables and merge in RxJava

I'm learning Java with Android by creating Hacker News reader app.

What I'm trying to do is:

  1. Send a request to /topstories, return Observable<List<int>>, emit when request finishes.
  2. Map each storyId to Observable<Story>
  3. Merge Observables into one entity, which emits List<Story>, when all requests finishes.

And to the code:

  private Observable<Story> getStoryById(int articleId) {
    BehaviorSubject<Story> subject = BehaviorSubject.create();
    // calls subject.onNext on success
    JsonObjectRequest request = createStoryRequest(articleId, subject);
    requestQueue.add(request);
    return subject;
  }

  public Observable<ArrayList<Story>> getTopStories(int amount) {
    Observable<ArrayList<Integer>> topStoryIds = (storyIdCache == null)
      ? fetchTopIds()
      : Observable.just(storyIdCache);

    return topStoryIds
      .flatMap(id -> getStoryById(id))
      // some magic here
  }

Then we would use this like:

getTopStories(20)
  .subscribe(stories -> ...)
like image 524
Selenir Avatar asked Mar 06 '23 17:03

Selenir


2 Answers

You can try something like that

Observable<List<Integers>> ids = getIdsObservable();
Single<List<Story>> listSingle =
            ids.flatMapIterable(ids -> ids)
            .flatMap(id -> getStoryById(id)).toList();

Then you can subscribe to that Single to get the List<Story>

like image 195
elmorabea Avatar answered Mar 10 '23 10:03

elmorabea


Please have a look at my solution. I changed your interface to return a Single for getStoryById(), because it should only return one value. After that, I created a for each Story a Single request and subscribed to all of them with Single.zip. Zip will execute given lambda, when all Singles are finished. On drawback is, that all requestes will be fired at once. If you do not want this, I will update my post. Please take into considerations that @elmorabea solution will also subscribe to the first 128 elements (BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));), and to the next element when one finishes.

@Test
  void name() {
    Api api = mock(Api.class);

    when(api.getTopStories()).thenReturn(Flowable.just(Arrays.asList(new Story(1), new Story(2))));
    when(api.getStoryById(eq(1))).thenReturn(Single.just(new Story(888)));
    when(api.getStoryById(eq(2))).thenReturn(Single.just(new Story(888)));

    Flowable<List<Story>> listFlowable =
        api.getTopStories()
            .flatMapSingle(
                stories -> {
                  List<Single<Story>> collect =
                      stories
                          .stream()
                          .map(story -> api.getStoryById(story.id))
                          .collect(Collectors.toList());

                  // possibly not the best idea to subscribe to all singles at the same time
                  Single<List<Story>> zip =
                      Single.zip(
                          collect,
                          objects -> {
                            return Arrays.stream(objects)
                                .map(o -> (Story) o)
                                .collect(Collectors.toList());
                          });

                  return zip;
                });

    TestSubscriber<List<Story>> listTestSubscriber =
        listFlowable.test().assertComplete().assertValueCount(1).assertNoErrors();

    List<List<Story>> values = listTestSubscriber.values();

    List<Story> stories = values.get(0);

    assertThat(stories.size()).isEqualTo(2);
    assertThat(stories.get(0).id).isEqualTo(888);
    assertThat(stories.get(1).id).isEqualTo(888);
  }

  interface Api {
    Flowable<List<Story>> getTopStories();

    Single<Story> getStoryById(int id);
  }

  static class Story {
    private final int id;

    Story(int id) {
      this.id = id;
    }
  }
like image 42
Hans Wurst Avatar answered Mar 10 '23 11:03

Hans Wurst