I use fileStream to read files in the hdfs directory from Spark (streaming context). In case my Spark shut down and starts after some time, I would like to read the new files in the directory. I don't want to read old files in the directory which was already read and processed by Spark. I am trying to avoid duplicates here.
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File")
any code snippets to help?
You can use the FileSystem
API:
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(sc.hadoopConfiguration)
val outPutPath = new Path("/abc")
if (fs.exists(outPutPath))
fs.delete(outPutPath, true)
fileStream
already handles that for you - from its Scaladoc:
Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.
This means that fileStream
would only load new files (created after streaming context was started), any files that already existed in the folder before you started your streaming application would be ignored.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With