This is a question i've already asked on the spark user mailing list and i hope to get more success here.
I'm not sure it's directly related to spark though spark has something to do with the fact I can't easily resolve that problem.
I'm trying to get some files from S3 using various patterns. My problem is that some of those patterns may return nothing, and when they do so, i get the following exception:
org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3n://bucket/mypattern matches 0 files
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:197)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.FlatMappedRDD.getPartitions(FlatMappedRDD.scala:30)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:52)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:52)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:58)
at org.apache.spark.api.java.JavaPairRDD.reduceByKey(JavaPairRDD.scala:335)
... 2 more
I would like a way to ignore missing files and just do nothing in that case. The problem here IMO is that i don't know if a pattern will return something until it's actually executed and spark starts processing data only when an action occurs (here, the reduceByKey
part). So i can't just catch an error somewhere and let things continue on.
One solution would be to force spark to process each path individually but that will probably cost allot in terms of speed and/or memory so i'm looking for an other option that would be efficient.
I'm using spark 0.9.1. Thanks
Ok, digging a bit into Spark and thanks to someone guiding me on the spark user list I think i got it:
sc.newAPIHadoopFile("s3n://missingPattern/*", EmptiableTextInputFormat.class, LongWritable.class, Text.class, sc.hadoopConfiguration())
.map(new Function<Tuple2<LongWritable, Text>, String>() {
@Override
public String call(Tuple2<LongWritable, Text> arg0) throws Exception {
return arg0._2.toString();
}
})
.count();
And the EmptiableTextInputFormat
which does the magic:
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class EmptiableTextInputFormat extends TextInputFormat {
@Override
public List<InputSplit> getSplits(JobContext arg0) throws IOException {
try {
return super.getSplits(arg0);
} catch (InvalidInputException e) {
return Collections.<InputSplit> emptyList();
}
}
}
One could eventually check the message of the InvalidInputException
for a lil more precision.
For anyone wanting a quick hack, here is an example using sc.wholeTextFiles
def wholeTextFilesIgnoreErrors(path: String, sc: SparkContext): RDD[(String, String)] = {
// TODO This is a bit hacky, probabally ought to work out a better way using lower level hadoop api
sc.wholeTextFiles(path.split(",").filter(subPath => Try(sc.textFile(subPath).take(1)).isSuccess).mkString(","))
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With