Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream processing while relying on previous and next elements

I have to work through a fixed width file that contains a predefined record layout, multiple types of records exist and the first character of the record determines its type. Because it is fixed width it is not always possible to fit a whole record type on one line, so the second character is a sequence number of the record. For example:

0This is the header record------------------------------------
1This is another record always existing out of one lin--------
21This is a record that can be composed out of multiple parts.
22This is the second part of record type 2--------------------
21This is a new record of type 2, first part.-----------------
22This is the second part of record type 2--------------------
23This is the third part of record type 2---------------------
...

With the Stream API, I would like to parse this file:

Stream<String> lines = Files.lines(Paths.get(args[1]));

lines.map(line -> RecordFactory.createRecord(line)).collect(Collectors.toList());

But since this stream delivers line by line the mapping of record 2 is incomplete when it parses the first line of record type 2 (record type 2 sequence 1). The next line (record type 2 sequence 2) should be added to the result of the previous mapping.

How can I solve this problem with lambda's without having to scarify thread safety?

like image 863
Juru Avatar asked Sep 26 '22 12:09

Juru


1 Answers

Operating on consecutive elements matching a predicate is not easily achievable currently with the Stream API.

One option would be to use the StreamEx library that offers the groupRuns operation:

Returns a stream consisting of lists of elements of this stream where adjacent elements are grouped according to supplied predicate.

The following code groups together lines where the record part number of the consecutive line is strictly greater than the one of the previous line. The record number is extracted with a regular expression that finds all the digits after the first ignored digit.

private static final Pattern PATTERN = Pattern.compile("\\d(\\d+)");

public static void main(String[] args) throws IOException {
    try (StreamEx<String> stream = StreamEx.ofLines(Paths.get("..."))) {
        List<Record> records =
            stream.groupRuns((s1, s2) -> getRecordPart(s2) > getRecordPart(s1))
                  .map(RecordFactory::createRecord)
                  .toList();
    }
}

private static final int getRecordPart(String str) {
    Matcher matcher = PATTERN.matcher(str);
    if (matcher.find()) {
        return Integer.parseInt(matcher.group(1));
    }
    return 1; // if the pattern didn't find anything, it means the record is on a single line
}

This assumes that your RecordFactory would create a Record from a List<String> and not from a String. Note that this solution can be run in parallel, although it would probably be better to store the content of the file into a List and post-process that list if you want better parallel performance (at the cost of memory).

like image 80
Tunaki Avatar answered Sep 28 '22 03:09

Tunaki