Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to fix "MetadataFetchFailedException: Missing an output location for shuffle"?

If I increase the model size of my word2vec model I start to get this kind of exception in my log:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

I tried to write my own "save model" version which looks like this:

  def save(model: Word2VecModel, sc: SparkContext, path: String): Unit = {

    println("Saving model as CSV ..")

    val vectorSize = model.getVectors.values.head.size

    println("vectorSize="+vectorSize)

    val SEPARATOR_TOKEN = " "    
    val dataArray = model.getVectors.toSeq.map { case (w, v) => Data(w, v) }

    println("Got dataArray ..")
    println("parallelize(dataArray, 10)")
    val par = sc.parallelize(dataArray, 10)
          .map(d => {

            val sb = new mutable.StringBuilder()
            sb.append(d.word)
            sb.append(SEPARATOR_TOKEN)

            for(v <- d.vector) {
              sb.append(v)
              sb.append(SEPARATOR_TOKEN)
            }
            sb.setLength(sb.length - 1)
            sb.append("\n")
            sb.toString()
          })
    println("repartition(1)")
    val rep = par.repartition(1)
    println("collect()")
    val vectorsAsString = rep.collect()

    println("Collected serialized vectors ..")    

    val cfile = new mutable.StringBuilder()

    cfile.append(vectorsAsString.length)
    cfile.append(" ")
    cfile.append(vectorSize)
    cfile.append("\n")

    val sb = new StringBuilder
    sb.append("word,")
    for(i <- 0 until vectorSize) {
      sb.append("v")
      sb.append(i.toString)
      sb.append(",")
    }
    sb.setLength(sb.length - 1)
    sb.append("\n")

    for(vectorString <- vectorsAsString) {
      sb.append(vectorString)
      cfile.append(vectorString)
    }

    println("Saving file to " + new Path(path, "data").toUri.toString)
    sc.parallelize(sb.toString().split("\n"), 1).saveAsTextFile(new Path(path+".csv", "data").toUri.toString)
    sc.parallelize(cfile.toString().split("\n"), 1).saveAsTextFile(new Path(path+".cs", "data").toUri.toString)
  }

Apparently it's working similar to their current implementation - it doesn't.

I'd like to get a word2vec model. It works with small files but not if the model gets larger.

like image 744
Stefan Falk Avatar asked Apr 23 '16 19:04

Stefan Falk


1 Answers

MetadataFetchFailedException is thrown when a MapOutputTracker on an executor could not find requested shuffle map outputs for partitions in local cache and tried to fetch them remotely from the driver's MapOutputTracker.

That could lead to few conclusions:

  1. The driver's memory issues
  2. The executors' memory issues
  3. Executors being lost

Please review the logs looking for issues reported as "Executor lost" INFO messages and/or review web UI's Executors page and see how the executors work.

The root cause of executors being lost may also be that the cluster manager has decided to kill ill-behaved executors (that may have used up more memory than requested).

See the other question FetchFailedException or MetadataFetchFailedException when processing big data set for more insights.

like image 66
Jacek Laskowski Avatar answered Nov 03 '22 05:11

Jacek Laskowski