Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Disable parquet metadata summary in Spark

I have a spark job (for 1.4.1) receiving a stream of kafka events. I would like to save them continuously as parquet on tachyon.

val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

lines.window(Seconds(1), Seconds(1)).foreachRDD { (rdd, time) =>
  if (rdd.count() > 0) {
    val mil = time.floor(Duration(86400000)).milliseconds
    hiveContext.read.json(rdd).toDF().write.mode(SaveMode.Append).parquet(s"tachyon://192.168.1.12:19998/persisted5$mil")
    hiveContext.sql(s"CREATE TABLE IF NOT EXISTS persisted5$mil USING org.apache.spark.sql.parquet OPTIONS ( path 'tachyon://192.168.1.12:19998/persisted5$mil')")
  }
}

however I see that as time goes on, on every parquet write, spark goes through each 1 sec parquet parts, which get slower and slower

15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-db03b24d-6f98-4b5d-bb40-530f35b82633.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-3a7857e2-0435-4ee0-ab2c-6d40224f8842.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-47ff2ac1-da00-4473-b3f7-52640014bc5b.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-61625436-7353-4b1e-bb8d-e8afad3a582e.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-e711aa9a-9bf5-41d5-8523-f5edafa69626.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-4e0cca38-cf75-4771-8965-20a30c863100.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-d1510ed4-2c99-43e2-b3d1-38d3d54e626d.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-022d1918-392d-433f-a7f4-074e46b4460f.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-cf71f5d2-ba0e-4729-9aa1-41dad5d1d08f.gz.parquet, 65536)
15/08/22 22:04:05 INFO : open(tachyon://192.168.1.12:19998/persisted51440201600000/part-r-00000-ce990b1e-82cc-4feb-a162-ac3ddc275609.gz.parquet, 65536)

I came to the conclusion that this is due to the update of summary data, I believe spark do not make use of them. so I would like to disable it

parquet sources shows that I should be able to set "parquet.enable.summary-metadata" to false.

now, I have tried setting it like this, right after creating hiveContext

hiveContext.sparkContext.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
hiveContext.sparkContext.hadoopConfiguration.setInt("parquet.metadata.read.parallelism", 10) 

but without success, I also still get logs showing a parallelism of 5 (default).

What is the correct way to disable summary data in spark with parquet?

like image 618
Pierre Lacave Avatar asked Aug 22 '15 21:08

Pierre Lacave


People also ask

Does Parquet save schema?

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.

Why does Parquet work better than spark?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.

How do I extract data from a Parquet file?

With the query results stored in a DataFrame, we can use petl to extract, transform, and load the Parquet data. In this example, we extract Parquet data, sort the data by the Column1 column, and load the data into a CSV file.

Can we edit Parquet file?

when we need to edit the data, in our data structures (Parquet), that are immutable. You can add partitions to Parquet files, but you can't edit the data in place.


2 Answers

setting "parquet.enable.summary-metadata" as text ("false" and not false) seems to work for us.

By the way Spark does use the _common_metadata file (we copy that over manually for repetitive jobs)

like image 69
Arnon Rotem-Gal-Oz Avatar answered Oct 19 '22 06:10

Arnon Rotem-Gal-Oz


Spark 2.0 doesn't save metadata summaries by default any more, see SPARK-15719.

If you are working with data hosted in S3, you may still find parquet performance hit by parquet itself trying to scan the tail of all objects to check their schemas. That can be disabled explicitly

sparkConf.set("spark.sql.parquet.mergeSchema", "false")
like image 14
stevel Avatar answered Oct 19 '22 05:10

stevel