I would like to use Apache's parquet-mr project to read/write Parquet files programmatically with Java. I can't seem to find any documentation for how to use this API (aside from going through the source code and seeing how it's used) -- just wondering if any such documentation exists?
With the query results stored in a DataFrame, we can use petl to extract, transform, and load the Parquet data. In this example, we extract Parquet data, sort the data by the Column1 column, and load the data into a CSV file.
Convert Parquet to CSV We can now write our multiple Parquet files out to a single CSV file using the to_csv method. Make sure to set single_file to True and index to False. Let's verify that this actually worked by reading the csv file into a pandas DataFrame.
I wrote a blog article about reading parquet files (http://www.jofre.de/?p=1459) and came up with the following solution that even is capable of reading INT96 fields.
You need the following maven dependencies:
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
The code basically is:
public class Main {
private static Path path = new Path("file:\\C:\\Users\\file.snappy.parquet");
private static void printGroup(Group g) {
int fieldCount = g.getType().getFieldCount();
for (int field = 0; field < fieldCount; field++) {
int valueCount = g.getFieldRepetitionCount(field);
Type fieldType = g.getType().getType(field);
String fieldName = fieldType.getName();
for (int index = 0; index < valueCount; index++) {
if (fieldType.isPrimitive()) {
System.out.println(fieldName + " " + g.getValueToString(field, index));
}
}
}
}
public static void main(String[] args) throws IllegalArgumentException {
Configuration conf = new Configuration();
try {
ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
MessageType schema = readFooter.getFileMetaData().getSchema();
ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
PageReadStore pages = null;
try {
while (null != (pages = r.readNextRowGroup())) {
final long rows = pages.getRowCount();
System.out.println("Number of rows: " + rows);
final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows; i++) {
final Group g = recordReader.read();
printGroup(g);
// TODO Compare to System.out.println(g);
}
}
} finally {
r.close();
}
} catch (IOException e) {
System.out.println("Error reading parquet file.");
e.printStackTrace();
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With