Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java split stream by predicate into stream of streams

I have hundreds of large (6GB) gziped log files that I'm reading using GZIPInputStreams that I wish to parse. Suppose each one has the format:

Start of log entry 1
    ...some log details
    ...some log details
    ...some log details
Start of log entry 2
    ...some log details
    ...some log details
    ...some log details
Start of log entry 3
    ...some log details
    ...some log details
    ...some log details

I'm streaming the gziped file contents line by line through BufferedReader.lines(). The stream looks like:

[
    "Start of log entry 1",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
]

The start of every log entry can by identified by the predicate: line -> line.startsWith("Start of log entry"). I would like to transform this Stream<String> into a Stream<Stream<String>> according to this predicate. Each "substream" should start when the predicate is true, and collect lines while the predicate is false, until the next time the predicate true, which denotes the end of this substream and the start of the next. The result would look like:

[
    [
        "Start of log entry 1",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 2",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 3",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
]

From there, I can take each substream and map it through new LogEntry(Stream<String> logLines) so as to aggregate related log lines into LogEntry objects.

Here's a rough idea of how that would look:

import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

import static java.lang.System.out;

class Untitled {
    static final String input = 
        "Start of log entry 1\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 2\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 3\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details";

    static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); 

    public static void main(String[] args) throws Exception {
        try (ByteArrayInputStream gzipInputStream
        = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
             InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); 
             BufferedReader reader = new BufferedReader( inputStreamReader )) {

            reader.lines()
                .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
                .map(LogEntry::new)
                .forEach(out::println);
        }
    }
}

Constraint: I have hundreds of these large files to process, in parallel (but only a single sequential stream per file), which makes loading them them entirely into memory (e.g. by storing them as a List<String> lines) is not feasible.

Any help appreciated!

like image 812
Alexander Avatar asked Mar 27 '18 23:03

Alexander


2 Answers

Frederico's answer is probably the nicest way for this particular problem. Following his last thought about custom Spliterator, I'll add an adapted version of an answer to a similar question, where I proposed using a custom iterator to created a chunked stream. This approach would also work on other streams that are not created by input readers.

public class StreamSplitter<T>
    implements Iterator<Stream<T>>
{
    private Iterator<T>  incoming;
    private Predicate<T> startOfNewEntry;
    private T            nextLine;

    public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
    {
        Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
    {
        this.incoming = stream.iterator();
        this.startOfNewEntry = startOfNewEntry;
        if (incoming.hasNext())
            nextLine = incoming.next();
    }

    @Override
    public boolean hasNext()
    {
        return nextLine != null;
    }

    @Override
    public Stream<T> next()
    {
        List<T> nextEntrysLines = new ArrayList<>();
        do
        {
            nextEntrysLines.add(nextLine);
        } while (incoming.hasNext()
                 && !startOfNewEntry.test((nextLine = incoming.next())));

        if (!startOfNewEntry.test(nextLine)) // incoming does not have next
            nextLine = null;

        return nextEntrysLines.stream();
    }
}

Example

public static void main(String[] args)
{
    Stream<String> flat = Stream.of("Start of log entry 1",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 2",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 3",
                                    "    ...some log details",
                                    "    ...some log details");

    StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
                  .forEach(logEntry -> {
                      System.out.println("------------------");
                      logEntry.forEach(System.out::println);
                  });
}

// Output
// ------------------
// Start of log entry 1
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 2
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 3
//     ...some log details
//     ...some log details

The iterator always looks one line ahead. As soon as that lline is the beginning of a new entry, it will wrapp the previous entry in a stream and return it as next. The factory method streamOf turns this iterator into a stream to be used as in the example I gave above.

I changed the split condition from a regex to a Predicate, so you can specify more complicated conditions with the help of multiple regexes, if-conditions, and so on.

Note that I only tested it with the example data above, so I don't know how it would behave with more complicated, errornous, or empty input.

like image 82
Malte Hartwig Avatar answered Oct 19 '22 06:10

Malte Hartwig


I think the main problem is that you are reading line by line and trying to create a LogEntry instance out of the lines, instead of reading block by block (which might cover many lines).

For this, you could use Scanner.findAll (available since Java 9) with a proper regex:

String input =
        "Start of log entry 1\n"        +
        "    ...some log details 1.1\n" +
        "    ...some log details 1.2\n" +
        "    ...some log details 1.3\n" +
        "Start of log entry 2\n"        +
        "    ...some log details 2.1\n" +
        "    ...some log details 2.2\n" +
        "    ...some log details 2.3\n" +
        "Start of log entry 3\n"        +
        "    ...some log details 3.1\n" +
        "    ...some log details 3.2\n" +
        "    ...some log details 3.3";

try (ByteArrayInputStream gzip = 
         new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
     InputStreamReader reader = new InputStreamReader(gzip);
     Scanner scanner = new Scanner(reader)) {

    String START = "Start of log entry \\d+";
    Pattern pattern = Pattern.compile(
            START + "(?<=" + START + ").*?(?=" + START + "|$)", 
            Pattern.DOTALL);

    scanner.findAll(pattern)
            .map(MatchResult::group)
            .map(s -> s.split("\\R"))
            .map(LogEntry::new)
            .forEach(System.out::println);

} catch (IOException e) {
    throw new UncheckedIOException(e);
}

So, this works by lazily finding matches in the Scanner instance. Scanner.findAll returns a Stream<MatchResult> and MatchResult.group() returns the matched String. Then we are splitting this string by line-breaks (\\R). This returns a String[] with each element of the array being each line. Then, assuming LogEntry has a constructor that accepts a String[] argument, we transform each one of these arrays to a LogEntry instance. Finally, assuming LogEntry has an overriden toString() method, we're printing each LogEntry instance to the output.

It is worth mentioning that the Scanner starts its work when forEach is invoked on the stream.

A note apart is the regex we're using to match log entries in the input. I'm not an expert in the regex world, so I'm almost sure there's quite some room for improvement here. First of all, we're using Pattern.DOTALL so that the . matches not only common characters but also line breaks. Then, there is the actual regex. The idea is that it matches and consumes Start of log entry \\d+, then it uses a look-behind against Start of log entry \\d+, then it consumes characters from the input in a non-greedy manner (this is the .*? part) and finally it looks-ahead to check if there is another occurence of Start of log entry \\d+ or if the end of the input has been reached. Please refer to this amazing article about regular expressions if you want to dig into this subject.


I don't know of any similar alternative if you're not on Java 9+. What you could do, though, is to create a custom Spliterator that wraps the Spliterator returned by the stream returned by BufferedReader.lines() and add the desired parsing behavior to it. Then, you'd need to create a new Stream out of this Spliterator. Not a trivial task at all...

like image 2
fps Avatar answered Oct 19 '22 07:10

fps