I have a hive table that is built on top of a load of external parquet files. Parquet files should be generated by the spark job, but due to setting metadata flag to false they were not generated. I'm wondering if it is possible to restore it in some painless way. The structure of files is like follows:
/apps/hive/warehouse/test_db.db/test_table/_SUCCESS
/apps/hive/warehouse/test_db.db/test_table/_common_metadata
/apps/hive/warehouse/test_db.db/test_table/_metadata
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-20
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-21
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-22
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-23
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-24
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-25
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-26
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-27
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-28
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-29
/apps/hive/warehouse/test_db.db/test_table/end_date=2016-04-30
Let's assume that the file _metadata
is non-existing or outdated. Is there a way to recreate it via hive command/generate it without having to start the whole spark job?
Ok so here is the drill, metadata can be accessed directly using Parquet tools. You'll need to get the footers for your parquet file first :
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val conf = spark.sparkContext.hadoopConfiguration
def getFooters(conf: Configuration, path: String) = {
val fs = FileSystem.get(conf)
val footers = ParquetFileReader.readAllFootersInParallel(conf, fs.getFileStatus(new Path(path)))
footers
}
Now you can get your file metadata as followed :
def getFileMetadata(conf: Configuration, path: String) = {
getFooters(conf, path)
.asScala.map(_.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala)
}
Now you can get the metadata of your parquet file :
getFileMetadata(conf, "/tmp/foo").headOption
// Option[scala.collection.mutable.Map[String,String]] =
// Some(Map(org.apache.spark.sql.parquet.row.metadata ->
// {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{"foo":"bar"}}
// {"name":"txt","type":"string","nullable":true,"metadata":{}}]}))
We can also use extracted footers to write standalone metadata file when needed:
import org.apache.parquet.hadoop.ParquetFileWriter
def createMetadata(conf: Configuration, path: String) = {
val footers = getFooters(conf, path)
ParquetFileWriter.writeMetadataFile(conf, new Path(path), footers)
}
I hope this answers your question. You can read more about Spark DataFrames and Metadata on awesome-spark's spark-gotchas repo.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With