Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Bypassing org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://[...] matches 0 files

This is a question i've already asked on the spark user mailing list and i hope to get more success here.

I'm not sure it's directly related to spark though spark has something to do with the fact I can't easily resolve that problem.

I'm trying to get some files from S3 using various patterns. My problem is that some of those patterns may return nothing, and when they do so, i get the following exception:

org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
    at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
    at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335)
    ... 2 more

I would like a way to ignore missing files and just do nothing in that case. The problem here IMO is that i don't know if a pattern will return something until it's actually executed and spark starts processing data only when an action occurs (here, the reduceByKey part). So i can't just catch an error somewhere and let things continue on.

One solution would be to force spark to process each path individually but that will probably cost allot in terms of speed and/or memory so i'm looking for an other option that would be efficient.

I'm using spark 0.9.1. Thanks

like image 950
Crystark Avatar asked May 21 '14 13:05

Crystark


2 Answers

Ok, digging a bit into Spark and thanks to someone guiding me on the spark user list I think i got it:

sc.newAPIHadoopFile("s3n://missingPattern/*", EmptiableTextInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration())
    .map(new Function<Tuple2<LongWritable, Text>, String>() {
        @Override
        public String call(Tuple2<LongWritable, Text> arg0) throws Exception {
            return arg0._2.toString();
        }
    })
    .count();

And the EmptiableTextInputFormat which does the magic:

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class EmptiableTextInputFormat extends TextInputFormat {
    @Override
    public List<InputSplit> getSplits(JobContext arg0) throws IOException {
        try {
            return super.getSplits(arg0);
        } catch (InvalidInputException e) {
            return Collections.<InputSplit> emptyList();
        }
    }
}

One could eventually check the message of the InvalidInputException for a lil more precision.

like image 119
Crystark Avatar answered Nov 13 '22 20:11

Crystark


For anyone wanting a quick hack, here is an example using sc.wholeTextFiles

def wholeTextFilesIgnoreErrors(path: String, sc: SparkContext): RDD[(String, String)] = {
    // TODO This is a bit hacky, probabally ought to work out a better way using lower level hadoop api

    sc.wholeTextFiles(path.split(",").filter(subPath => Try(sc.textFile(subPath).take(1)).isSuccess).mkString(","))
  }
like image 29
samthebest Avatar answered Nov 13 '22 22:11

samthebest