I am exploring reactive programming and RxJava. It is fun, but I am stuck on a problem for which I cannot find an answer. My basic question: what is a reactive-appropriate way to terminate an otherwise infinitely-running Observable? I also welcome critiques and reactive best practices regarding my code.
As an exercise, I am writing a log file tail utility. The stream of lines in the log file is represented by an Observable<String>
. To get the BufferedReader
to continue reading text that is added to the file, I ignore the usual reader.readLine() == null
termination check and instead interpret it to mean that my thread should sleep and wait for more logger text.
But while I can terminate the Observer using takeUntil
, I need to find a clean way to terminate the otherwise infinitely-running file watcher. I can write my own terminateWatcher
method/field, but that breaks the Observable/Observer encapsulation -- and I'd like to stay as strict to the reactive paradigm as possible.
Here is the Observable<String>
code:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
Here is the Observer code that prints the new lines as they come:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
My two questions are:
Since you are initiating the file watching when the subscription is requested, it makes sense to terminate it when the subscription ends, i.e., when the Subscription
is closed. This addresses one of the loose ends in your code comments: you return a Subscription
which tells the FileWatcher
to stop watching the file. Then replace your loop condition such that it checks whether the subscription has been canceled instead of checking whether the current thread has been interrupted.
Problem is, that isn't very helpful if your onSubscribe()
method never returns. Perhaps your FileWatcher
should require a scheduler to be specified, with the reading occurring on that scheduler.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With