How to check if a S3 directory exists or not before reading it?
I was trying this, as given here http://bigdatatech.taleia.software/2015/12/21/check-if-exists-a-amazon-s3-path-from-apache-spark/
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val fs = FileSystem.get(new Configuration())
fs.listStatus(new Path("s3://s3bucket/2017/10/31/*/*/"))
but getting this error
Wrong FS: s3://s3bucket/2017/10/31/*/*, expected: hdfs://ip-172-31-55-167.ec2.internal:8020
I can check whether the exact file exists or not using its exact path, but I have to use wild cards like here "s3://s3bucket/2017/10/31/ */ *" which it doesn't support.
Checked this StackOverflow question: Spark : Read file only if the path exists , but its not related to my use-case.
This is because when you call FileSystem.get(new Configuration())
, the file system resolved is the default file system which in this case is hdfs
.
You first need to obtain the right file system by providing an URI from a path which contains the s3
scheme and your bucket.
It would also be better to use the Hadoop configuration of your current Spark context, to ensure you will use the same file system settings as Spark.
import java.net.URI
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SparkSession
// Create a session with Spark master running locally (vs on a cluster)
// for development purposes
val sparkSession = SparkSession.builder
.appName("My App")
.master("local")
.getOrCreate
val conf = sparkSession.sparkContext.hadoopConfiguration
val fs = FileSystem.get(URI.create("s3://s3bucket/"), conf)
This will work only if you have an implementation of the S3 file system on your classpath. I can see you are running your program on an EC2 instance so you should be able to access S3 without specifying explicit AWS credentials.
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import java.net.URI
var sc = new SparkContext()
if(FileSystem.get(new URI(externalTableLocation), sc.hadoopConfiguration).exists(new Path(externalTableLocation)))
{
println("File exists")
val maxPopulatedDate = spark.sql(s"SELECT MAX(DateID) FROM tier_ppw.DistributionDailyFact_Stage")
log.info("Reading maxPopulatedDate: "+maxPopulatedDate)
}
else
{
val maxPopulatedDate = "2016-01-01"
log.info("Reading maxPopulatedDate: "+maxPopulatedDate)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With