Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark's int96 time type

When you create a timestamp column in spark, and save to parquet, you get a 12 byte integer column type (int96); I gather the data is split into 6-bytes for Julian day and 6 bytes for nanoseconds within the day.

This does not conform to any parquet logical type. The schema in the parquet file does not, then, give an indication of the column being anything but an integer.

My question is, how does Spark know to load such a column as a timestamp as opposed to a big integer?

like image 345
mdurant Avatar asked Mar 06 '17 14:03

mdurant


People also ask

What is int96 timestamp?

Timestamps saved as an int96 are made up of the nanoseconds in the day (first 8 byte) and the Julian day (last 4 bytes).

What is int96 type?

When you create a timestamp column in spark, and save to parquet, you get a 12 byte integer column type (int96); I gather the data is split into 6-bytes for Julian day and 6 bytes for nanoseconds within the day. This does not conform to any parquet logical type.

How are timestamps stored in parquet?

Timestamp values in parquet files are saved as int96 values by data processing frameworks like Hive and Impala. But there is a slight difference between the way these int96 values are saved and read by Hive and Impala. This difference can cause inconsistencies and issues with data management of the timestamp values.

Does parquet support timestamp?

Parquet requires a Hive metastore version of 1.2 or above in order to use TIMESTAMP .


1 Answers

Semantics is determined based on the metadata. We'll need some imports:

import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration

example data:

val path = "/tmp/ts"

Seq((1, "2017-03-06 10:00:00")).toDF("id", "ts")
  .withColumn("ts", $"ts".cast("timestamp"))
  .write.mode("overwrite").parquet(path)

and Hadoop configuration:

val conf = spark.sparkContext.hadoopConfiguration
val fs = FileSystem.get(conf)

Now we can access Spark metadata:

ParquetFileReader
  .readAllFootersInParallel(conf, fs.getFileStatus(new Path(path)))
  .get(0)
  .getParquetMetadata
  .getFileMetaData
  .getKeyValueMetaData
  .get("org.apache.spark.sql.parquet.row.metadata")

and the result is:

String = {"type":"struct","fields: [
  {"name":"id","type":"integer","nullable":false,"metadata":{}},
  {"name":"ts","type":"timestamp","nullable":true,"metadata":{}}]}

Equivalent information can be stored in the Metastore as well.

According to the official documentation this is used to achieve compatibility with Hive and Impala:

Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.

and can be controlled using spark.sql.parquet.int96AsTimestamp property.

like image 146
zero323 Avatar answered Oct 20 '22 18:10

zero323