Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write to multiple outputs by key Spark - one Spark job

How can you write to multiple outputs dependent on the key using Spark in a single Job.

Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job

E.g.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)

would ensure cat prefix/1 is

a
b

and cat prefix/2 would be

c

EDIT: I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.

like image 907
samthebest Avatar asked Jun 02 '14 12:06

samthebest


4 Answers

If you use Spark 1.4+, this has become much, much easier thanks to the DataFrame API. (DataFrames were introduced in Spark 1.3, but partitionBy(), which we need, was introduced in 1.4.)

If you're starting out with an RDD, you'll first need to convert it to a DataFrame:

val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")

In Python, this same code is:

people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])

Once you have a DataFrame, writing to multiple outputs based on a particular key is simple. What's more -- and this is the beauty of the DataFrame API -- the code is pretty much the same across Python, Scala, Java and R:

people_df.write.partitionBy("number").text("people")

And you can easily use other output formats if you want:

people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")

In each of these examples, Spark will create a subdirectory for each of the keys that we've partitioned the DataFrame on:

people/
  _SUCCESS
  number=1/
    part-abcd
    part-efgh
  number=2/
    part-abcd
    part-efgh
like image 85
Nick Chammas Avatar answered Nov 10 '22 20:11

Nick Chammas


I would do it like this which is scalable

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = 
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Split" + args(1))
    val sc = new SparkContext(conf)
    sc.textFile("input/path")
    .map(a => (k, v)) // Your own implementation
    .partitionBy(new HashPartitioner(num))
    .saveAsHadoopFile("output/path", classOf[String], classOf[String],
      classOf[RDDMultipleTextOutputFormat])
    spark.stop()
  }
}

Just saw similar answer above, but actually we don't need customized partitions. The MultipleTextOutputFormat will create file for each key. It is ok that multiple record with same keys fall into the same partition.

new HashPartitioner(num), where the num is the partition number you want. In case you have a big number of different keys, you can set number to big. In this case, each partition will not open too many hdfs file handlers.

like image 82
zhang zhan Avatar answered Nov 10 '22 20:11

zhang zhan


If you potentially have many values for a given key, I think the scalable solution is to write out one file per key per partition. Unfortunately there is no built-in support for this in Spark, but we can whip something up.

sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
  .mapPartitionsWithIndex { (p, it) =>
    val outputs = new MultiWriter(p.toString)
    for ((k, v) <- it) {
      outputs.write(k.toString, v)
    }
    outputs.close
    Nil.iterator
  }
  .foreach((x: Nothing) => ()) // To trigger the job.

// This one is Local, but you could write one for HDFS
class MultiWriter(suffix: String) {
  private val writers = collection.mutable.Map[String, java.io.PrintWriter]()
  def write(key: String, value: Any) = {
    if (!writers.contains(key)) {
      val f = new java.io.File("output/" + key + "/" + suffix)
      f.getParentFile.mkdirs
      writers(key) = new java.io.PrintWriter(f)
    }
    writers(key).println(value)
  }
  def close = writers.values.foreach(_.close)
}

(Replace PrintWriter with your choice of distributed filesystem operation.)

This makes a single pass over the RDD and performs no shuffle. It gives you one directory per key, with a number of files inside each.

like image 16
Daniel Darabos Avatar answered Nov 10 '22 19:11

Daniel Darabos


This includes the codec as requested, necessary imports, and pimp as requested.

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

// TODO Need a macro to generate for each Tuple length, or perhaps can use shapeless
implicit class PimpedRDD[T1, T2](rdd: RDD[(T1, T2)]) {
  def writeAsMultiple(prefix: String, codec: String,
                      keyName: String = "key")
                     (implicit sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._

    rdd.toDF(keyName, "_2").write.partitionBy(keyName)
    .format("text").option("codec", codec).save(prefix)
  }
}

val myRdd = sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

One subtle difference to the OP is that it will prefix <keyName>= to the directory names. E.g.

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec")

Would give:

prefix/key=1/part-00000
prefix/key=2/part-00000

where prefix/my_number=1/part-00000 would contain the lines a and b, and prefix/my_number=2/part-00000 would contain the line c.

And

myRdd.writeAsMultiple("prefix", "org.apache.hadoop.io.compress.GzipCodec", "foo")

Would give:

prefix/foo=1/part-00000
prefix/foo=2/part-00000

It should be clear how to edit for parquet.

Finally below is an example for Dataset, which is perhaps nicer that using Tuples.

implicit class PimpedDataset[T](dataset: Dataset[T]) {
  def writeAsMultiple(prefix: String, codec: String, field: String): Unit = {
    dataset.write.partitionBy(field)
    .format("text").option("codec", codec).save(prefix)
  }
}
like image 6
samthebest Avatar answered Nov 10 '22 21:11

samthebest