Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava -- Terminating Infinite Streams

I am exploring reactive programming and RxJava. It is fun, but I am stuck on a problem for which I cannot find an answer. My basic question: what is a reactive-appropriate way to terminate an otherwise infinitely-running Observable? I also welcome critiques and reactive best practices regarding my code.

As an exercise, I am writing a log file tail utility. The stream of lines in the log file is represented by an Observable<String>. To get the BufferedReader to continue reading text that is added to the file, I ignore the usual reader.readLine() == null termination check and instead interpret it to mean that my thread should sleep and wait for more logger text.

But while I can terminate the Observer using takeUntil, I need to find a clean way to terminate the otherwise infinitely-running file watcher. I can write my own terminateWatcher method/field, but that breaks the Observable/Observer encapsulation -- and I'd like to stay as strict to the reactive paradigm as possible.

Here is the Observable<String> code:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

Here is the Observer code that prints the new lines as they come:

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

My two questions are:

  1. What is a reactive-consistent way to terminate an otherwise infinitely-running stream?
  2. What other mistakes in my code make you cry? :)
like image 925
MonkeyWithDarts Avatar asked Dec 18 '13 16:12

MonkeyWithDarts


1 Answers

Since you are initiating the file watching when the subscription is requested, it makes sense to terminate it when the subscription ends, i.e., when the Subscription is closed. This addresses one of the loose ends in your code comments: you return a Subscription which tells the FileWatcher to stop watching the file. Then replace your loop condition such that it checks whether the subscription has been canceled instead of checking whether the current thread has been interrupted.

Problem is, that isn't very helpful if your onSubscribe() method never returns. Perhaps your FileWatcher should require a scheduler to be specified, with the reading occurring on that scheduler.

like image 118
Mike Strobel Avatar answered Sep 28 '22 04:09

Mike Strobel