Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark-scala : Check whether a S3 directory exists or not before reading it

How to check if a S3 directory exists or not before reading it?

I was trying this, as given here http://bigdatatech.taleia.software/2015/12/21/check-if-exists-a-amazon-s3-path-from-apache-spark/

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val fs = FileSystem.get(new Configuration())
fs.listStatus(new Path("s3://s3bucket/2017/10/31/*/*/"))

but getting this error Wrong FS: s3://s3bucket/2017/10/31/*/*, expected: hdfs://ip-172-31-55-167.ec2.internal:8020

I can check whether the exact file exists or not using its exact path, but I have to use wild cards like here "s3://s3bucket/2017/10/31/ */ *" which it doesn't support.

Checked this StackOverflow question: Spark : Read file only if the path exists , but its not related to my use-case.

like image 854
Shivam Panjeta Avatar asked Nov 06 '17 13:11

Shivam Panjeta


2 Answers

This is because when you call FileSystem.get(new Configuration()), the file system resolved is the default file system which in this case is hdfs.

You first need to obtain the right file system by providing an URI from a path which contains the s3 scheme and your bucket.

It would also be better to use the Hadoop configuration of your current Spark context, to ensure you will use the same file system settings as Spark.

import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SparkSession

// Create a session with Spark master running locally (vs on a cluster)
// for development purposes
val sparkSession = SparkSession.builder
                               .appName("My App")
                               .master("local")
                               .getOrCreate

val conf = sparkSession.sparkContext.hadoopConfiguration
val fs = FileSystem.get(URI.create("s3://s3bucket/"), conf)

This will work only if you have an implementation of the S3 file system on your classpath. I can see you are running your program on an EC2 instance so you should be able to access S3 without specifying explicit AWS credentials.

like image 80
Alexandre Dupriez Avatar answered Nov 20 '22 08:11

Alexandre Dupriez


import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import java.net.URI

var sc = new SparkContext()
if(FileSystem.get(new URI(externalTableLocation), sc.hadoopConfiguration).exists(new Path(externalTableLocation)))
{
  println("File exists")
  val maxPopulatedDate = spark.sql(s"SELECT MAX(DateID) FROM tier_ppw.DistributionDailyFact_Stage")
      log.info("Reading maxPopulatedDate: "+maxPopulatedDate)
}
else
{
   val maxPopulatedDate = "2016-01-01"
   log.info("Reading maxPopulatedDate: "+maxPopulatedDate)
}
like image 3
Rajiv Singh Avatar answered Nov 20 '22 08:11

Rajiv Singh