I am totally new to RxJava and reactive programming. I have an assignment where i must read file and store it to Observable. I have tried to make a Callable with BufferedReader inside and use Observable.fromCallable(), but it didn't work much.
Could you please show me how can i do that?
I am using RxJava 2.0.
A basic solution where I use a nested class FileObservableSource to produce the data and then defer the creation of the Observable until an Observer subscribes:
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
public class StackOverflow {
    public static void main(String[] args) {
        final Observable<String> observable = Observable.defer(() -> new FileObservableSource("pom.xml"));
        observable.subscribe(
                line -> System.out.println("next line: " + line),
                        Throwable::printStackTrace,
                        () -> System.out.println("finished")
                );
    }
    static class FileObservableSource implements ObservableSource<String> {
        private final String filename;
        FileObservableSource(String filename) {
            this.filename = filename;
        }
        @Override
        public void subscribe(Observer<? super String> observer) {
            try {
                Files.lines(Paths.get(filename)).forEach(observer::onNext);
                observer.onComplete();
            } catch (IOException e) {
                observer.onError(e);
            }
        }
    }
}
                        for Java 8, BufferedReader has a file stream and the iterator which you can use in tandem with RxJava. https://dzone.com/articles/java-8-stream-rx-java
you can get a Flowable using the following code
 Flowable.fromIterable(bufferedReader.lines()::iterator)
I use Flowable and Single to be more concise, it'll still work with Observable. the code snippet is an example of how I read each single line from the buffered reader.
try (
       FileReader fr = new FileReader("./folder1/source.txt");
       BufferedReader br = new BufferedReader(fr); 
       //e.g. i write the output into this file
       FileWriter fw = new FileWriter("./folder1/destination.txt");
       BufferedWriter bw = new BufferedWriter(fw)
        ) { 
      //=========================================
      //calling br.lines() gives you the stream. 
      //br.lines()::iterator returns you the iterable
      //=========================================
      Flowable.fromIterable(br.lines()::iterator)
         //at this point you can do what you want for each line
                 //e.g. I split long strings into smaller Flowable<String> chunks.
                .flatMap(i -> splitLine(i))
                 //e.g. I then use the output to query an external server with retrofit.
                 //     which returns me a Single<String> result
                .flatMapSingle(i -> handlePinyin(pr, i))  
                .subscribe(
                        //I save the results from server to destination.txt
                        py -> appendToFile(bw, py),
                        //print and quit if error
                        err -> print("ERROR " + err), 
                        () -> print("Completed!"));
}catch (IOException e) {
        print("Error " + e);
}
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With