Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming textFileStream not supporting wildcards

I setup a simple test to stream text files from S3 and got it to work when I tried something like

val input = ssc.textFileStream("s3n://mybucket/2015/04/03/")

and in the bucket I would have log files go in there and everything would work fine.

But if their was a subfolder, it would not find any files that got put into the subfolder (and yes, I am aware that hdfs doesn't actually use a folder structure)

val input = ssc.textFileStream("s3n://mybucket/2015/04/")

So, I tried to simply do wildcards like I have done before with a standard spark application

val input = ssc.textFileStream("s3n://mybucket/2015/04/*")

But when I try this it throws an error

java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist.
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
.....

I know for a fact that you can use wildcards when reading fileInput for a standard spark applications but it appears that when doing streaming input, it doesn't do that nor does it automatically process files in subfolders. Is there something I'm missing here??

Ultimately what I need is a streaming job to be running 24/7 that will be monitoring an S3 bucket that has logs placed in it by date

So something like

s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName>

Is there any way to hand it the top most folder and it automatically read files that show up in any folder (cause obviously the date will increase every day)?

EDIT

So upon digging into the documentation at http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources it states that nested directories are not supported.

Can anyone shed some light as to why this is the case?

Also, since my files will be nested based upon their date, what would be a good way of solving this problem in my streaming application? It's a little complicated since the logs take a few minutes to get written to S3 and so the last file being written for the day could be written in the previous day's folder even though we're a few minutes into the new day.

like image 625
Adam Ritter Avatar asked Apr 03 '15 04:04

Adam Ritter


1 Answers

Some "ugly but working solution" can be created by extending FileInputDStream. Writing sc.textFileStream(d) is equivalent to

new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)

You can create CustomFileInputDStream that will extend FileInputDStream. The custom class will copy the compute method from the FileInputDStream class and adjust the findNewFiles method to your needs.

changing findNewFiles method from:

 private def findNewFiles(currentTime: Long): Array[String] = {
    try {
      lastNewFileFindingTime = clock.getTimeMillis()

  // Calculate ignore threshold
  val modTimeIgnoreThreshold = math.max(
    initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
    currentTime - durationToRemember.milliseconds  // trailing end of the remember window
  )
  logDebug(s"Getting new files for time $currentTime, " +
    s"ignoring files older than $modTimeIgnoreThreshold")
  val filter = new PathFilter {
    def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
  }
  val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
  val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
  logInfo("Finding new files took " + timeTaken + " ms")
  logDebug("# cached file times = " + fileToModTime.size)
  if (timeTaken > slideDuration.milliseconds) {
    logWarning(
      "Time taken to find new files exceeds the batch size. " +
        "Consider increasing the batch size or reducing the number of " +
        "files in the monitored directory."
    )
  }
  newFiles
} catch {
  case e: Exception =>
    logWarning("Error finding new files", e)
    reset()
    Array.empty
}

}

to:

  private def findNewFiles(currentTime: Long): Array[String] = {
    try {
      lastNewFileFindingTime = clock.getTimeMillis()

      // Calculate ignore threshold
      val modTimeIgnoreThreshold = math.max(
        initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
        currentTime - durationToRemember.milliseconds  // trailing end of the remember window
      )
      logDebug(s"Getting new files for time $currentTime, " +
        s"ignoring files older than $modTimeIgnoreThreshold")
      val filter = new PathFilter {
        def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
      }
      val directories = fs.listStatus(directoryPath).filter(_.isDirectory)
      val newFiles = ArrayBuffer[FileStatus]()

      directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*))

      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
      logInfo("Finding new files took " + timeTaken + " ms")
      logDebug("# cached file times = " + fileToModTime.size)
      if (timeTaken > slideDuration.milliseconds) {
        logWarning(
          "Time taken to find new files exceeds the batch size. " +
            "Consider increasing the batch size or reducing the number of " +
            "files in the monitored directory."
        )
      }
      newFiles.map(_.getPath.toString).toArray
    } catch {
      case e: Exception =>
        logWarning("Error finding new files", e)
        reset()
        Array.empty
    }
  }

will check for files in all first degree sub folders, you can adjust it to use the batch timestamp in order to access the relevant "subdirectories".

I created the CustomFileInputDStream as I mentioned and activated it by calling:

new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)

It seems to behave us expected.

When I write solution like this I must add some points for consideration:

  • You are breaking Spark encapsulation and creating a custom class that you would have to support solely as time pass.

  • I believe that solution like this is the last resort. If your use case can be implemented by different way, it is usually better to avoid solution like this.

  • If you will have a lot of "subdirectories" on S3 and would check each one of them it will cost you.

  • It will be very interesting to understand if Databricks doesn't support nested files just because of possible performance penalty or not, maybe there is a deeper reason I haven't thought about.

like image 61
Michael Kopaniov Avatar answered Oct 26 '22 23:10

Michael Kopaniov