Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How To Do Recursive Observable Call in RxJava?

I am quite new to RxJava (and Reactive paradigm in general), so please bear with me.

Suppose I have this News and this nested Comment data structure:

public class News {
  public int id;
  public int[] commentIds; //only top level comments
  public News(int id, int[] commentIds) {
    this.id = id;
    this.commentIds = commentIds;
  }
}

public class Comment {
  public int id;
  public int parentId; //ID of parent News or parent comment
  public int[] childIds;
  public Comment(int id, int parentId, int[] childIds) {
    this.id = id;
    this.parentId = parentId;
    this.childIds = childIds;
  }
}

and suppose I have this API endpoint:

getComments(int commentId) //return Observable<Comment> for Comment with ID commentId

Now, let's assume:

getComments(1); //will return Comment(1, 99, [3,4])
getComments(2); //will return Comment(2, 99, [5,6])
getComments(3); //will return Comment(3, 1, [])
getComments(4); //will return Comment(4, 1, [])
getComments(5); //will return Comment(5, 2, [])
getComments(6); //will return Comment(6, 2, [])

**

Now, if I have News n = News(99, [1,2]), how do I get all of its children comment recursively? i.e. to get comments with ID [1,2,3,4,5,6]?

**

I have searched and stumbled upon this: https://jkschneider.github.io/blog/2014/recursive-observables-with-rxjava.html

This is the recursion function:

public class FileRecursion {
    static Observable<File> listFiles(File f) {
        if(f.isDirectory())
            return Observable.from(f.listFiles()).flatMap(FileRecursion::listFiles);
        return Observable.just(f);
    }

    public static void main(String[] args) {
          Observable.just(new File("/Users/joschneider/Desktop"))
                  .flatMap(FileRecursion::listFiles)
                  .subscribe(f -> System.out.println(f.getAbsolutePath()));
    }
}

It shows an example on how to do recursive observable calls, but the inner function (f.listFiles()) is a blocking operation (doesn't return another Observable). In my case, the inner function (getComments) is a non-blocking function that returns another Observables. How do I do that?

Any help will be much appreciated.

like image 586
HoverPhoenix Avatar asked Jul 06 '15 12:07

HoverPhoenix


People also ask

What is Observable RxJava?

An Observable is like a speaker that emits the value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.

What is Reactivex RxJava?

RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.

Is RxJava deprecated?

RxJava, once the hottest framework in Android development, is dying. It's dying quietly, without drawing much attention to itself. RxJava's former fans and advocates moved on to new shiny things, so there is no one left to say a proper eulogy over this, once very popular, framework.


1 Answers

This does practically the same thing described in the article:

Observable<Comment> getInnerComments(Comment comment) {
    if (comment.childIds.length > 0)
        return Observable.merge(
                Observable.just(comment),
                Observable.from(comment.childIds)
                        .flatMap(id -> getComments(id))
                        .flatMap(this::getInnerComments));
    return Observable.just(comment);
}

public static void main(String[] args) {
    getComments(1)
          .flatMap(this::getInnerComments)
          .subscribe(c -> System.out.println(comment.toString()));
}

I start with the comment with id = 1, then I pass it to getInnerComments(). The getInnerComments() checks if the comment has children. If it does, it iterates over every child id (Observable#from) and loads every child with your getComments(int) API. Then every child is passed to the getInnerComments() to do the same procedure. If a comment doesn't have children, it is immediately returned using Observable#just.

This is pseudo-code and it wasn't tested, but you should get the idea.


Below is an example of how to get all comments and then aggregate them to one List<Comment>.

getNews(99)
        .flatMap(news -> Observable.from(news.commentIds))
        .flatMap(commentId -> getComments(commentId))
        .flatMap(comment -> getInnerComments(comment))
        .toList()
        .subscribe(commentList -> { });
like image 110
Egor Neliuba Avatar answered Oct 02 '22 07:10

Egor Neliuba