How does spark streaming fileStream
identifies new files in the monitoring directory from one interval to another interval?
Is it based on on the new file names or the file creation timestamp or any other approach?
What is the significance of newFilesOnly
argument?
fileStream(String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass, Function<org.apache.hadoop.fs.Path,Boolean> filter, boolean newFilesOnly, org.apache.hadoop.conf.Configuration conf)
The quick answer on the monitoring is that it uses the file modification time (isNewFile
uses getFileModTime
)
As to newFilesOnly
....that is not that straight forward, but you can discern this info from the code.
The TL;DR; is that it only pulls old files (newFilesOnly = false
) from the last minute from streaming start.
The more complicated answer with this argument is that it sets a value initialModTimeIgnoreThreshold
to be the current time or 0. This value is then used to set the modTimeIgnoreThreshold
, which is the max of the above versus the oldest file that is in the search time window (currentTime - durationToRemember.milliseconds
). That is where an existing bug was JUST fixed. The time window used to be a hard coded one minute window, so you could only ever get files modified within the last 1 minute if you set this argument to false. I am still skeptical about this fix though...But, either way, this argument was essentially broken until 3 days ago.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With