I recently had a requirement where I needed to generate Parquet files that could be read by Apache Spark using only Java (Using no additional software installations such as: Apache Drill, Hive, Spark, etc.). The files needed to be saved to S3 so I will be sharing details on how to do both.
There were no simple to follow guides on how to do this. I'm also not a Java programmer so the concepts of using Maven, Hadoop, etc. were all foreign to me. So it took me nearly two weeks to get this working. I'd like to share my personal guide below on how I achieved this
Parquet requires a Hive metastore version of 1.2 or above in order to use TIMESTAMP .
CSVs are row-orientated, which means they're slow to query and difficult to store efficiently. That's not the case with Parquet, which is a column-orientated storage option. The size difference between those two is enormous for identical datasets, as you'll see shortly.
CSV should generally be the fastest to write, JSON the easiest for a human to understand and Parquet the fastest to read. CSV is the defacto standard of a lot of data and for fair reasons; it's [relatively] easy to comprehend for both users and computers and made more accessible via Microsoft Excel.
Disclaimer: The code samples below in no way represent best practices and are only presented as a rough how-to.
Dependencies:
I'll be using NetBeans as my IDE.
Some info regarding parquet in Java (For noobs such as me):
Pre-requisite:
You must have hadoop on the windows machine that will be running the Java code. The good news is you don't need to install the entire hadoop software, rather you need only two files:
These can be downloaded here. You will need version 2.8.1 for this example (due to parquet-avro 1.9.0).
Add a new System Variable (not user variable) called: HADOOP_HOME with the value C:\hadoop-2.8.1
Modify the System Path variable (not user variable) and add the following to the end: %HADOOP_HOME%\bin
If this config was not done properly you will get the following error at run-time: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
Getting Started with Coding:
First thing is you need to generate a Schema. Now as far as I can tell there is no way you can generate a schema programmatically at run-time. the Schema.Parser class' parse() method only takes a file or a string literal as a parameter and doesn't let you modify the schema once it is created. To circumvent this I am generating my Schema JSON at run time and parsing that. Below is an example Schema:
String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
+ "\"type\": \"record\"," //Must be set as record
+ "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
+ "\"fields\": ["
+ " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
+ " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
+ " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
+ " ]}";
Parser parser = new Schema.Parser().setValidate(true);
Schema avroSchema = parser.parse(schema);
Details on Avro schema can be found here: https://avro.apache.org/docs/1.8.0/spec.html
Next we can start generating records (Avro primitive types are simple):
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("myInteger", 1);
record.put("myString", "string value 1");
fixed_len_byte_array
). So we have to use fixed in our case as well (as can be seen in the schema). In Java we must use BigDecimal
in order to truly handle decimals. And I've identified that a Decimal(32,4)
will not take more than 16 bytes no matter the value. So we will use a standard byte array size of 16 in our serialization below (and in the schema above):BigDecimal myDecimalValue = new BigDecimal("99.9999");
//First we need to make sure the BigDecimal matches our schema scale:
myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
//Next we get the decimal value as one BigInteger (like there was no decimal point)
BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
//Finally we serialize the integer
byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
//We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
byte[] myDecimalBuffer = new byte[16];
if (myDecimalBuffer.length >= decimalBytes.length) {
//Because we set our fixed byte array size as 16 bytes, we need to
//pad-left our original value's bytes with zeros
int myDecimalBufferIndex = myDecimalBuffer.length - 1;
for(int i = decimalBytes.length - 1; i >= 0; i--){
myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
myDecimalBufferIndex--;
}
//Save result
fixed.bytes(myDecimalBuffer);
} else {
throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
}
//We can finally write our decimal to our record
record.put("myDecimal", fixed);
For Date values, Avro specifies that we need to save the number of days since EPOCH as an integer. (If you need the time component as well, such as an actual DateTime type, you need to use the Timestamp Avro type, which I will not cover). The easiest way I found to get the number of days since epoch is using the joda-time library. If you added the hadoop-aws dependency to your project you should already have this library. If not you will need to add it yourself:
//Get epoch value
MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
DateTime currentDate = new DateTime(); //Can take Java Date in constructor
Days days = Days.daysBetween(epoch, currentDate);
//We can write number of days since epoch into the record
record.put("myDate", days.getDays());
We finally can start writing our parquet file as such
try {
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "ACCESSKEY");
conf.set("fs.s3a.secret.key", "SECRETKEY");
//Below are some other helpful settings
//conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
//conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
//conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
//conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors
Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
//Use path below to save to local file system instead
//Path path = new Path("data.parquet");
try (ParquetWriter writer = AvroParquetWriter.builder(path)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.GZIP)
.withConf(conf)
.withPageSize(4 * 1024 * 1024) //For compression
.withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
.build()) {
//We only have one record to write in our example
writer.write(record);
}
} catch (Exception ex) { ex.printStackTrace(System.out); }
Here is the data loaded into Apache Spark (2.2.0):
And for your convenience, the entire source code:
package com.mycompany.stackoverflow;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.MutableDateTime;
public class Main {
public static void main(String[] args) {
System.out.println("Start");
String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
+ "\"type\": \"record\"," //Must be set as record
+ "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
+ "\"fields\": ["
+ " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
+ " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
+ " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
+ " ]}";
Schema.Parser parser = new Schema.Parser().setValidate(true);
Schema avroSchema = parser.parse(schema);
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("myInteger", 1);
record.put("myString", "string value 1");
BigDecimal myDecimalValue = new BigDecimal("99.9999");
//First we need to make sure the huge decimal matches our schema scale:
myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
//Next we get the decimal value as one BigInteger (like there was no decimal point)
BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
//Finally we serialize the integer
byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
//We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
byte[] myDecimalBuffer = new byte[16];
if (myDecimalBuffer.length >= decimalBytes.length) {
//Because we set our fixed byte array size as 16 bytes, we need to
//pad-left our original value's bytes with zeros
int myDecimalBufferIndex = myDecimalBuffer.length - 1;
for(int i = decimalBytes.length - 1; i >= 0; i--){
myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
myDecimalBufferIndex--;
}
//Save result
fixed.bytes(myDecimalBuffer);
} else {
throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
}
//We can finally write our decimal to our record
record.put("myDecimal", fixed);
//Get epoch value
MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
DateTime currentDate = new DateTime(); //Can take Java Date in constructor
Days days = Days.daysBetween(epoch, currentDate);
//We can write number of days since epoch into the record
record.put("myDate", days.getDays());
try {
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "ACCESSKEY");
conf.set("fs.s3a.secret.key", "SECRETKEY");
//Below are some other helpful settings
//conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
//conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
//conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
//conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors.
Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
//Use path below to save to local file system instead
//Path path = new Path("data.parquet");
try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(path)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.GZIP)
.withConf(conf)
.withPageSize(4 * 1024 * 1024) //For compression
.withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
.build()) {
//We only have one record to write in our example
writer.write(record);
}
} catch (Exception ex) {
ex.printStackTrace(System.out);
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With