After repartitioning a DataFrame in Spark 1.3.0 I get a .parquet exception when saving to Amazon's S3.
logsForDate
.repartition(10)
.saveAsParquetFile(destination) // <-- Exception here
The exception I receive is:
java.io.IOException: The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: COLUMN
at parquet.hadoop.ParquetFileWriter$STATE.error(ParquetFileWriter.java:137)
at parquet.hadoop.ParquetFileWriter$STATE.startBlock(ParquetFileWriter.java:129)
at parquet.hadoop.ParquetFileWriter.startBlock(ParquetFileWriter.java:173)
at parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:152)
at parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
at org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:635)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:649)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I would like to know what is the problem and how to solve it.
Parquet is a columnar format that is supported by many other data processing systems. Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data.
GZIP is the default write compression format for files in the Parquet and text file storage formats. Files in the tar. gz format are not supported. LZ4 – This member of the Lempel-Ziv 77 (LZ7) family also focuses on compression and decompression speed rather than maximum compression of data.
I can actually reproduce this problem with Spark 1.3.1 on EMR, when saving to S3.
However, saving to HDFS works fine. You could save to HDFS first, and then use e.g. s3distcp to move the files to S3.
I faced with this error when saveAsParquetFile into HDFS. It was because datanode socket write timeout
, therefore I change it to a longer one in Hadoop Settings:
<property>
<name>dfs.datanode.socket.write.timeout</name>
<value>3000000</value>
</property>
<property>
<name>dfs.socket.timeout</name>
<value>3000000</value>
</property>
Hope this helps if you could set S3 like this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With