I am trying to use Apache Spark SQL to etl json log data in S3 into Parquet files also on S3. My code is basically:
import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")
This code works when I have up to 2000 partitions and fails for 5000 or more, regardless of the volume of data. Normally one could just coalesce the partitions to an acceptable number, but this is a very large data set and at 2000 partitions I hit the problem describe in this question
14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56)
at $line37.$read$$iwC$$iwC.<init>(<console>:58)
at $line37.$read$$iwC.<init>(<console>:60)
at $line37.$read.<init>(<console>:62)
at $line37.$read$.<init>(<console>:66)
at $line37.$read$.<clinit>(<console>)
at $line37.$eval$.<init>(<console>:7)
at $line37.$eval$.<clinit>(<console>)
at $line37.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106)
at java.io.BufferedInputStream.close(BufferedInputStream.java:472)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I am running this on spark-1.1.0 on an an R3.xlarge in ec2. I am using the spark-shell console to run the above code. I am able to perform non trivial queries on the data
SchemaRDD object afterwards, so it does not appear to be a resource issue.
It is also possible to read and query the resulting Parquet file, it just takes an extremely long in due to the lack of summary files.
Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.
Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.
What is Parquet? Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
Try to set this property as false :
sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With