Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Standalone Mode: How to compress spark output written to HDFS

Related to my other question, but distinct:

someMap.saveAsTextFile("hdfs://HOST:PORT/out")

If I save an RDD to HDFS, how can I tell spark to compress the output with gzip? In Hadoop, it is possible to set

mapred.output.compress = true

and choose the compression algorithm with

mapred.output.compression.codec = <<classname of compression codec>>

How would I do this in spark? Will this work as well?

edit: using spark-0.7.2

like image 830
ptikobj Avatar asked Jun 21 '13 17:06

ptikobj


4 Answers

The method saveAsTextFile takes an additional optional parameter of the codec class to use. So for your example it should be something like this to use gzip:

someMap.saveAsTextFile("hdfs://HOST:PORT/out", classOf[GzipCodec])

UPDATE

Since you're using 0.7.2 you might be able to port the compression code via configuration options that you set at startup. I'm not sure if this will work exactly, but you need to go from this:

conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)

to something like this:

System.setProperty("spark.hadoop.mapred.output.compress", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
System.setProperty("spark.hadoop.mapred.output.compression.type", "BLOCK")

If you get it to work, posting your config would probably be helpful to others as well.

like image 119
Noah Avatar answered Oct 22 '22 19:10

Noah


Another way to save gzipped files to HDFS or Amazon S3 directory system is to use the saveAsHadoopFile method.

someMap is RDD[(K,V)], if you have someMap as RDD[V], you can call someMap.map(line=>(line, "") to use saveAsHadoopFile method.

import org.apache.hadoop.io.compress.GzipCodec

someMap.saveAsHadoopFile(output_folder_path, classOf[String], classOf[String], classOf[MultipleTextOutputFormat[String, String]], classOf[GzipCodec])
like image 44
Gongqin Shen Avatar answered Oct 22 '22 19:10

Gongqin Shen


For newer Spark release, please do the following in your spark-defaults.xml file. (mapred is derecated).

<property>
    <name>mapreduce.output.fileoutputformat.compress</name>
    <value>true</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.codec</name>
    <value>GzipCodec</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.type</name>
    <value>BLOCK</value>
</property>
like image 25
nikk Avatar answered Oct 22 '22 19:10

nikk


This is a simplest/shortest way to do compression quickly for all most all versions of the spark.

import org.apache.hadoop.io.SequenceFile.CompressionType

 /**
   * Set compression configurations to Hadoop `Configuration`.
   * `codec` should be a full class path
   */
  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) // "BLOCK" as string
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

where conf is spark.sparkContext.hadoopConfiguration

codec String parameter options in the above method are

 1.none 
 2.uncompressed 
 3.bzip2 
 4.deflate 
 5.gzip 
 6.lz4 
 7.snappy
like image 34
Ram Ghadiyaram Avatar answered Oct 22 '22 19:10

Ram Ghadiyaram