I have using Spark 1.3.0 and using it to write Parquet files to Openstack Swift Object store for a while now. I am using around 12 paritions for the parquet files which writes the parquet file in several parts on Swift. There's no problem in writing the files. But when I try to read it via Spark I get this error:
ERROR Executor: Exception in task 9.0 in stage 2.0 (TID 22)
java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:730)
at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490)
at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
I am using Hadoop-Swift patch available via https://github.com/openstack/sahara-extra/tree/master/hadoop-swiftfs which enables the Hadoop to recognize Swift as a filesystem.
Note 1: If download the file from Swift to local filesystem, Spark can read the file perfectly then.
Note 2: I have also noticed if I don't partition the parquet file, the reading is perfect.
Anyone who is using Spark, Openstack Swift ?
Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.
Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.
Parquet really excels when the query is on sparse data or low cardinality in column selection. It is especially good for queries which read particular columns from a “wide” (with many columns) table, since only needed columns are read and IO is minimized."
I have been using Spark 1.3.0 with Openstack Swift as my storage backend as well and encountered the exact same problem.
Using the cache functionality of Spark provided a valid workaround waiting for an actual fix in the swift support of hadoop-cloud.
val df = spark.read.parquet("your_swift_url").persist()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With