Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark SQL unable to complete writing Parquet data with a large number of shards

I am trying to use Apache Spark SQL to etl json log data in S3 into Parquet files also on S3. My code is basically:

import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")

This code works when I have up to 2000 partitions and fails for 5000 or more, regardless of the volume of data. Normally one could just coalesce the partitions to an acceptable number, but this is a very large data set and at 2000 partitions I hit the problem describe in this question

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
        at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
        at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
        at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
        at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
        at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
        at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56)
        at $line37.$read$$iwC$$iwC.<init>(<console>:58)
        at $line37.$read$$iwC.<init>(<console>:60)
        at $line37.$read.<init>(<console>:62)
        at $line37.$read$.<init>(<console>:66)
        at $line37.$read$.<clinit>(<console>)
        at $line37.$eval$.<init>(<console>:7)
        at $line37.$eval$.<clinit>(<console>)
        at $line37.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106)
        at java.io.BufferedInputStream.close(BufferedInputStream.java:472)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298)
        at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
        at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

I am running this on spark-1.1.0 on an an R3.xlarge in ec2. I am using the spark-shell console to run the above code. I am able to perform non trivial queries on the data SchemaRDD object afterwards, so it does not appear to be a resource issue. It is also possible to read and query the resulting Parquet file, it just takes an extremely long in due to the lack of summary files.

like image 664
Daniel Mahler Avatar asked Oct 10 '14 02:10

Daniel Mahler


People also ask

Why Parquet is best fit for spark?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.

Does spark support Parquet?

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.

What is parquet file format example?

What is Parquet? Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.


1 Answers

Try to set this property as false :

sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");
like image 184
morfious902002 Avatar answered Nov 16 '22 00:11

morfious902002