Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I delete files in hdfs directory after reading it using scala?

I use fileStream to read files in the hdfs directory from Spark (streaming context). In case my Spark shut down and starts after some time, I would like to read the new files in the directory. I don't want to read old files in the directory which was already read and processed by Spark. I am trying to avoid duplicates here.

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File")

any code snippets to help?

like image 273
user1125829 Avatar asked Jul 14 '17 13:07

user1125829


Video Answer


2 Answers

You can use the FileSystem API:

import org.apache.hadoop.fs.{FileSystem, Path}

val fs = FileSystem.get(sc.hadoopConfiguration)

val outPutPath = new Path("/abc")

if (fs.exists(outPutPath))
  fs.delete(outPutPath, true)
like image 68
Ishan Kumar Avatar answered Oct 04 '22 00:10

Ishan Kumar


fileStream already handles that for you - from its Scaladoc:

Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format.

This means that fileStream would only load new files (created after streaming context was started), any files that already existed in the folder before you started your streaming application would be ignored.

like image 21
Tzach Zohar Avatar answered Oct 04 '22 02:10

Tzach Zohar