Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parquet error when saving from Spark

After repartitioning a DataFrame in Spark 1.3.0 I get a .parquet exception when saving to Amazon's S3.

logsForDate
    .repartition(10)
    .saveAsParquetFile(destination) // <-- Exception here

The exception I receive is:

java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN
at parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137)
at parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129)
at parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173)
at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:635)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I would like to know what is the problem and how to solve it.

like image 487
Interfector Avatar asked Apr 30 '15 06:04

Interfector


People also ask

Does Spark support Parquet?

Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.

What is the default compression for Parquet?

GZIP is the default write compression format for files in the Parquet and text file storage formats. Files in the tar. gz format are not supported. LZ4 – This member of the Lempel-Ziv 77 (LZ7) family also focuses on compression and decompression speed rather than maximum compression of data.


2 Answers

I can actually reproduce this problem with Spark 1.3.1 on EMR, when saving to S3.

However, saving to HDFS works fine. You could save to HDFS first, and then use e.g. s3distcp to move the files to S3.

like image 193
Eric Eijkelenboom Avatar answered Oct 27 '22 08:10

Eric Eijkelenboom


I faced with this error when saveAsParquetFile into HDFS. It was because datanode socket write timeout, therefore I change it to a longer one in Hadoop Settings:

<property>
  <name>dfs.datanode.socket.write.timeout</name>
  <value>3000000</value>
</property>
<property>
  <name>dfs.socket.timeout</name>
  <value>3000000</value>
</property> 

Hope this helps if you could set S3 like this.

like image 37
yjshen Avatar answered Oct 27 '22 10:10

yjshen