Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write TIMESTAMP logical type (INT96) to parquet, using ParquetWriter?

I have a tool that uses a org.apache.parquet.hadoop.ParquetWriter to convert CSV data files to parquet data files.

Currently, it only handles int32, double, and string

I need to support the parquet timestamp logical type (annotated as int96), and I am lost on how to do that because I can't find a precise specification online.

It appears this timestamp encoding (int96) is rare and not well supported. I've found very little specification details online. This github README states that:

Timestamps saved as an int96 are made up of the nanoseconds in the day (first 8 byte) and the Julian day (last 4 bytes).

Specifically:

  1. Which parquet Type do I use for the column in MessageType schema? I assume I should use the primitive type, PrimitiveTypeName.INT96, but I'm not sure if there may be a way to specify a logical type?
  2. How do I write the data? i.e. In what format do I write the timestamp to the group? For an INT96 timestamp, I assume I must write some binary type?

Here is a simplified version of my code that demonstrates what I am trying to do. Specifically, take a look at the "TODO" comments, these are the two points in the code that correlate to the questions above.

List<Type> fields = new ArrayList<>();
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));

// TODO: 
//   Specify the TIMESTAMP type. 
//   How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null)); 

MessageType schema = new MessageType("input", fields);

// initialize writer
Configuration configuration = new Configuration();
configuration.setQuietMode(true);
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
  new Path("output.parquet"),
  new GroupWriteSupport(),
  CompressionCodecName.SNAPPY,
  ParquetWriter.DEFAULT_BLOCK_SIZE,
  ParquetWriter.DEFAULT_PAGE_SIZE,
  1048576,
  true,
  false,
  ParquetProperties.WriterVersion.PARQUET_1_0,
  configuration
);

// write CSV data
CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
int colIndex;
int rowNum = 0;
for (CSVRecord csvRecord : parser) {
  rowNum ++;
  Group group = f.newGroup();
  colIndex = 0;
  for (String record : csvRecord) {
    if (record == null || record.isEmpty() || record.equals( "NULL")) {
      colIndex++;
      continue;
    }


    record = record.trim();
    String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
    MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);

    switch (colIndex) {
      case 0: // int32
        group.add(colIndex, Integer.parseInt(record));
        break;
      case 1: // double
        group.add(colIndex, Double.parseDouble(record));
        break;
      case 2: // string
        group.add(colIndex, record);
        break;
      case 3:
        // TODO: convert CSV string value to TIMESTAMP type (how?)
        throw new NotImplementedException();
    }
  }
  writer.write(group);
}
writer.close();
like image 766
James Wierzba Avatar asked Feb 12 '19 19:02

James Wierzba


1 Answers

  1. INT96 timestamps use the INT96 physical type without any logical type, so don't annotate them with anything.
  2. If you are interested in the structure of an INT96 timestamp, take a look here. If you would like to see sample code that converts to and from this format, take a look at this file from Hive.
like image 85
Zoltan Avatar answered Sep 19 '22 09:09

Zoltan