Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reactive Streams - batching with timeout

I am looking into replacing a homegrown log processing library which looks awfully close to ReactiveStreams with io.projectreactor. The objective is to reduce the code we maintain, and take advantage of any new features added by the community (eyeing operator fusion).

As a start, I need to consume the stdio and merge the multiline log-entries into text blobs that would flow down the pipeline. The use case is explained in detail on multiline log entries chapter of the Filebeat docs (except we want it inprocess).

So far the code I have is:

BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
          .subscribe();          

This takes care about the multi-line merging when a new log header is detected, but in the existing library we also flush the accumulated lines after a timeout (i.e. if no text is received within 5 seconds, flush the record).

What would be the right way to model this in Reactor? Do I need to write my own operator, or can I customize any of the existing?

Any pointers to relevant examples and docs for achieving this use-case in Project Reactor or RxJava would be very much appreciated.

like image 264
ddimitrov Avatar asked Jul 12 '17 10:07

ddimitrov


2 Answers

It depends on how you identify the start and end of each buffer, so the following RxJava 2 code is intended as a hint about using the main source's value to open and close the buffer's gate:

TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
        o.buffer(o.filter(v -> v.contains("Start")), 
                 v -> Flowable.merge(o.filter(w -> w.contains("End")), 
                                     Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f)
.subscribe(System.out::println);

pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");

pp.onNext("Start");
pp.onNext("C");

scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();

Prints:

[Start, A, B, End]
[Start, C]
[Start, D, End]

It works by sharing the source via publish which allows reusing the same value from the upstream without having multiple source copies running at once. The opening is governed by the detection of a "Start" string on the line. The closing is governed by either the detection of the "End" string or a timer firing after a grace period.

Edit:

If the "Start" is also the indicator for the next batch, you can replace the "End" check with "Start" and alter the contents of the buffer since it will include the new header in the previous buffer otherwise:

pp.publish(f)
.doOnNext(v -> {
    int s = v.size();
    if (s > 1 && v.get(s - 1).contains("Start")) {
        v.remove(s - 1);
    }
})
.subscribe(System.out::println);
like image 199
akarnokd Avatar answered Oct 14 '22 05:10

akarnokd


buffer operator seems most suitable and simple solution for me.

It has size and time based strategies. You have log, so I think, you can interpret lines count as buffer size.

Here example - how emit items grouped by 4 or 5 seconds timespan:

    Observable<String> lineReader = Observable.<String>create(subscriber -> {
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            for (String line = br.readLine(); line != null; line = br.readLine()) {
                subscriber.onNext(line);
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }).subscribeOn(Schedulers.newThread());

    lineReader
      .buffer(5, TimeUnit.SECONDS,4)
      .filter(lines -> !lines.isEmpty())
      .subscribe(System.out::println);
like image 1
zella Avatar answered Oct 14 '22 05:10

zella