Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to overwrite the output directory in spark

Tags:

apache-spark

People also ask

How do I overwrite a parquet file?

You can also use df. write. mode(mode: String). parquet(path) Where mode: String can be: "overwrite", "append", "ignore", "error".

Which save mode will raise an error message if data already exists in target path?

ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

What is truncate false in Spark?

The following answer applies to a Spark Streaming application. By setting the "truncate" option to false, you can tell the output sink to display the full column.


UPDATE: Suggest using Dataframes, plus something like ... .write.mode(SaveMode.Overwrite) ....

Handy pimp:

implicit class PimpedStringRDD(rdd: RDD[String]) {
    def write(p: String)(implicit ss: SparkSession): Unit = {
      import ss.implicits._
      rdd.toDF().as[String].write.mode(SaveMode.Overwrite).text(p)
    }
  }

For older versions try

yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)

In 1.1.0 you can set conf settings using the spark-submit script with the --conf flag.

WARNING (older versions): According to @piggybox there is a bug in Spark where it will only overwrite files it needs to to write it's part- files, any other files will be left unremoved.


since df.save(path, source, mode) is deprecated, (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)

use df.write.format(source).mode("overwrite").save(path)
where df.write is DataFrameWriter

'source' can be ("com.databricks.spark.avro" | "parquet" | "json")


From the pyspark.sql.DataFrame.save documentation (currently at 1.3.1), you can specify mode='overwrite' when saving a DataFrame:

myDataFrame.save(path='myPath', source='parquet', mode='overwrite')

I've verified that this will even remove left over partition files. So if you had say 10 partitions/files originally, but then overwrote the folder with a DataFrame that only had 6 partitions, the resulting folder will have the 6 partitions/files.

See the Spark SQL documentation for more information about the mode options.


The documentation for the parameter spark.files.overwrite says this: "Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source." So it has no effect on saveAsTextFiles method.

You could do this before saving the file:

val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }

Aas explained here: http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html


df.write.mode('overwrite').parquet("/output/folder/path") works if you want to overwrite a parquet file using python. This is in spark 1.6.2. API may be different in later versions


  val jobName = "WordCount";
  //overwrite the output directory in spark  set("spark.hadoop.validateOutputSpecs", "false")
  val conf = new 
  SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
  val sc = new SparkContext(conf)