If my Spark job is down for some reason (e.g. demo, deployment) but the writing/moving to HDFS directory is continuous, I might skip those files once I up the Spark Streaming Job.
val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")
hdfsDStream.foreachRDD(
rdd => logInfo("Number of records in this batch: " + rdd.count())
)
Output --> Number of records in this batch: 0
Is there a way for Spark Streaming to move the "read" files to a different folder? Or we have to program it manually? So it will avoid reading already "read" files.
Is Spark Streaming the same as running the spark job (sc.textFile) in CRON?
As Dean mentioned, textFileStream uses the default of only using new files.
def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
So, all it is doing is calling this variant of fileStream
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}
And, looking at the FileInputDStream class we will see that it indeed can look for existing files, but defaults to new only:
newFilesOnly: Boolean = true,
So, going back into the StreamingContext code, we can see that there is and overload we can use by directly calling the fileStream method:
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag]
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}
ssc.fileStream[LongWritable, Text, TextInputFormat]
(directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With