Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming: HDFS

  1. I can't get my Spark job to stream "old" files from HDFS.

If my Spark job is down for some reason (e.g. demo, deployment) but the writing/moving to HDFS directory is continuous, I might skip those files once I up the Spark Streaming Job.

    val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")

    hdfsDStream.foreachRDD(
      rdd => logInfo("Number of records in this batch: " + rdd.count())
    )

Output --> Number of records in this batch: 0

  1. Is there a way for Spark Streaming to move the "read" files to a different folder? Or we have to program it manually? So it will avoid reading already "read" files.

  2. Is Spark Streaming the same as running the spark job (sc.textFile) in CRON?

like image 959
sophie Avatar asked Apr 30 '26 04:04

sophie


1 Answers

As Dean mentioned, textFileStream uses the default of only using new files.

  def textFileStream(directory: String): DStream[String] = {
    fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
  }

So, all it is doing is calling this variant of fileStream

def fileStream[
    K: ClassTag,
    V: ClassTag,
    F <: NewInputFormat[K, V]: ClassTag
  ] (directory: String): InputDStream[(K, V)] = {
    new FileInputDStream[K, V, F](this, directory)
  }

And, looking at the FileInputDStream class we will see that it indeed can look for existing files, but defaults to new only:

newFilesOnly: Boolean = true,

So, going back into the StreamingContext code, we can see that there is and overload we can use by directly calling the fileStream method:

def fileStream[
 K: ClassTag,
 V: ClassTag,
 F <: NewInputFormat[K, V]: ClassTag] 
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
  new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}

So, the TL;DR; is

ssc.fileStream[LongWritable, Text, TextInputFormat]
    (directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
like image 54
Justin Pihony Avatar answered May 03 '26 14:05

Justin Pihony