Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading CSV header with Dataflow

I have a CSV file, and I don't know the column names ahead of time. I need to output the data in JSON after some transformations in Google Dataflow.

What's the best way to take the header row and permeate the labels through all the rows?

For example:

a,b,c
1,2,3
4,5,6

...becomes (approximately):

{a:1, b:2, c:3}
{a:4, b:5, c:6}
like image 810
Maximilian Avatar asked Dec 23 '16 08:12

Maximilian


Video Answer


2 Answers

You should implement custom FileBasedSource (similar to TextIO.TextSource), that will read the first line and store header data

    @Override
    protected void startReading(final ReadableByteChannel channel)
    throws IOException {
        lineReader = new LineReader(channel);

        if (lineReader.readNextLine()) {
            final String headerLine = lineReader.getCurrent().trim();
            header = headerLine.split(",");
            readingStarted = true;
        }
    }

and latter, while reading other lines prepend it to current line data:

    @Override
    protected boolean readNextRecord() throws IOException {
        if (!lineReader.readNextLine()) {
            return false;
        }

        final String line = lineReader.getCurrent();
        final String[] data = line.split(",");

        // assumes all lines are valid
        final StringBuilder record = new StringBuilder();
        for (int i = 0; i < header.length; i++) {
            record.append(header[i]).append(":").append(data[i]).append(", ");
        }

        currentRecord = record.toString();
        return true;
    }

I've implemented a quick (complete) solution, available on github. I also added a dataflow unit test to demonstrate reading:

@Test
public void test_reading() throws Exception {
    final File file =
            new File(getClass().getResource("/sample.csv").toURI());
    assertThat(file.exists()).isTrue();

    final Pipeline pipeline = TestPipeline.create();

    final PCollection<String> output =
            pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath())));

    DataflowAssert
            .that(output)
            .containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, ");

    pipeline.run();
}

where sample.csv has following content:

a,b,c
1,2,3
4,5,6
like image 137
robosoul Avatar answered Oct 06 '22 01:10

robosoul


I have created a solution based on Luka's source code (see previous answer). Luka's code in github is for dataflow-1.x, and implements a FileBasedSource which extracts the first line and caches it, then prepends it to every following line. This requires the entire file to be processed on a single node (not splittable).

My variant of the FileBasedSource instead just returns the first line of a file; as described in the class javadoc this line can then be split (as desired) and used as a side-input to the logic which processes the complete file (which can then be done in parallel). The code is compatible with Beam 2.x (tested on Beam 2.4.0).

See http://moi.vonos.net/cloud/beam-read-header/

like image 32
Simon Kitching Avatar answered Oct 06 '22 02:10

Simon Kitching