Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Swift Integration Parquet

I have using Spark 1.3.0 and using it to write Parquet files to Openstack Swift Object store for a while now. I am using around 12 paritions for the parquet files which writes the parquet file in several parts on Swift. There's no problem in writing the files. But when I try to read it via Spark I get this error:

     ERROR Executor: Exception in task 9.0 in stage 2.0 (TID 22)
java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readFully(DataInputStream.java:169)
    at parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:730)
    at parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:490)
    at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:116)
    at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
    at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

I am using Hadoop-Swift patch available via https://github.com/openstack/sahara-extra/tree/master/hadoop-swiftfs which enables the Hadoop to recognize Swift as a filesystem.

Note 1: If download the file from Swift to local filesystem, Spark can read the file perfectly then.

Note 2: I have also noticed if I don't partition the parquet file, the reading is perfect.

Anyone who is using Spark, Openstack Swift ?

like image 798
apurva.nandan Avatar asked Sep 04 '15 14:09

apurva.nandan


People also ask

Does spark support parquet?

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.

Why parquet is best fit for spark?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.

Is parquet good for sparse data?

Parquet really excels when the query is on sparse data or low cardinality in column selection. It is especially good for queries which read particular columns from a “wide” (with many columns) table, since only needed columns are read and IO is minimized."


1 Answers

I have been using Spark 1.3.0 with Openstack Swift as my storage backend as well and encountered the exact same problem.

Using the cache functionality of Spark provided a valid workaround waiting for an actual fix in the swift support of hadoop-cloud.

val df = spark.read.parquet("your_swift_url").persist()
like image 54
Fractal Avatar answered Sep 22 '22 10:09

Fractal