Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to allow spark to ignore missing input files?

I want to run a spark job (spark v1.5.1) over some generated S3 paths containing avro files. I'm loading them with:

val avros = paths.map(p => sqlContext.read.avro(p))

Some of the paths will not exist though. How can I get spark to ignore those empty paths? Previously I've used this answer, but I'm not sure how to use that with the new dataframe API.

Note: I'm ideally looking for a similar approach to the linked answer that just makes input paths optional. I don't particularly want to have to explicitly check for the existence of paths in S3 (since that's cumbersome and may make development awkward), but I guess that's my fallback if there's no clean way to implement this now.

like image 858
jbrown Avatar asked Nov 10 '15 16:11

jbrown


People also ask

What does take () do in Spark?

In Spark, the take function behaves like an array. It receives an integer value (let say, n) as a parameter and returns an array of first n elements of the dataset.

Does Spark load all data in memory?

No. Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level. How can I run Spark on a cluster?


1 Answers

I would use the scala Try type in order to handle the possibility of failure when reading a directory of avro files. With 'Try' we can make the possibility of failure explicit in our code, and handle it in a functional manner:

object Main extends App {

  import scala.util.{Success, Try}
  import org.apache.spark.{SparkConf, SparkContext}
  import com.databricks.spark.avro._

  val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("example"))
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  //the first path exists, the second one doesn't
  val paths = List("/data/1", "/data/2")

  //Wrap the attempt to read the paths in a Try, then use collect to filter
  //and map with a single partial function.
  val avros =
    paths
      .map(p => Try(sqlContext.read.avro(p)))
      .collect{
        case Success(df) => df
      }
  //Do whatever you want with your list of dataframes
  avros.foreach{ df =>
    println(df.collect())
  }
  sc.stop()
}
like image 96
mattinbits Avatar answered Oct 19 '22 02:10

mattinbits