Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parquet Writer to buffer or byte stream

I have a java application that converts json messages to parquet format. Is there any parquet writer which writes to buffer or byte stream in java? Most of the examples, I have seen write to files.

like image 918
vijju Avatar asked Oct 17 '16 14:10

vijju


People also ask

What encoding does Parquet use?

Parquet implements the RLE encoding in the RunLenthBitPackingHybridEncoder, which is used by the RunLengthBitPackingHybridValuesWriter for encoding and writing column values. This encoder only supports only Dictionary indices, Boolean values and repetition and definition levels in the data pages.

How do I extract data from a Parquet file?

With the query results stored in a DataFrame, we can use petl to extract, transform, and load the Parquet data. In this example, we extract Parquet data, sort the data by the Column1 column, and load the data into a CSV file.

Why Parquet is best for spark?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.

Is Parquet better than CSV?

Parquet files are easier to work with because they are supported by so many different projects. Parquet stores the file schema in the file metadata. CSV files don't store file metadata, so readers need to either be supplied with the schema or the schema needs to be inferred.


2 Answers

TLDR; you will need to implement OutputFile, e.g. something along the line of:

import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

import java.io.BufferedOutputStream;
import java.io.IOException;

public class ParquetBufferedWriter implements OutputFile {

    private final BufferedOutputStream out;

    public ParquetBufferedWriter(BufferedOutputStream out) {
        this.out = out;
    }

    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    private PositionOutputStream createPositionOutputstream() {
        return new PositionOutputStream() {
            @Override
            public long getPos() throws IOException {
                return 0;
            }

            @Override
            public void write(int b) throws IOException {
                out.write(b);
            }
        };
    }

    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    @Override
    public boolean supportsBlockSize() {
        return false;
    }

    @Override
    public long defaultBlockSize() {
        return 0;
    }

}

And your writer would be something like:

    ParquetBufferedWriter out = new ParquetBufferedWriter();
        try (ParquetWriter<Record> writer = AvroParquetWriter.
                <Record>builder(out)
                .withRowGroupSize(DEFAULT_BLOCK_SIZE)
                .withPageSize(DEFAULT_PAGE_SIZE)
                .withSchema(SCHEMA)
                .build()) {

            for (Record record : records) {
                writer.write(record);
            }
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
like image 111
naimdjon Avatar answered Sep 19 '22 07:09

naimdjon


I just also needed to write to a stream, so I completed the example given by naimdjon. The following works perfectly fine for me.

class ParquetBufferedWriter implements OutputFile {
    
    private final BufferedOutputStream out;

    public ParquetBufferedWriter(BufferedOutputStream out) {
        this.out = out;
    }

    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    private PositionOutputStream createPositionOutputstream() {
        return new PositionOutputStream() {
            
            int pos = 0;

            @Override
            public long getPos() throws IOException {
                return pos;
            }

            @Override
            public void flush() throws IOException {
                out.flush();
            };

            @Override
            public void close() throws IOException {
                out.close();
            };

            @Override
            public void write(int b) throws IOException {
                out.write(b);
                pos++;
            }

            @Override
            public void write(byte[] b, int off, int len) throws IOException {
                out.write(b, off, len);
                pos += len;
            }
        };
    }

    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return createPositionOutputstream();
    }

    @Override
    public boolean supportsBlockSize() {
        return false;
    }

    @Override
    public long defaultBlockSize() {
        return 0;
    }
}
like image 32
breadcrumb42 Avatar answered Sep 21 '22 07:09

breadcrumb42