I'm trying to read/monitor txt files from a Hadoop file system directory. But I've noticed all txt files inside this directory are directories themselves as showed in this example bellow:
/crawlerOutput/b6b95b75148cdac44cd55d93fe2bbaa76aa5cccecf3d723c5e47d361b28663be-1427922269.txt/_SUCCESS
/crawlerOutput/b6b95b75148cdac44cd55d93fe2bbaa76aa5cccecf3d723c5e47d361b28663be-1427922269.txt/part-00000
/crawlerOutput/b6b95b75148cdac44cd55d93fe2bbaa76aa5cccecf3d723c5e47d361b28663be-1427922269.txt/part-00001
I'd want read all the data inside the part's files. I'm trying to use the following code as showed in this snippet:
val testData = ssc.textFileStream("/crawlerOutput/*/*")
But, unfortunately it said it doesn't exist /crawlerOutput/*/*. Doesn't textFileStream accept wildcards? What should I do to solve this problem?
The textFileStream() is just a wrapper for fileStream() and does not support subdirectories (see https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html).
You would need to list the specific directories to monitor. If you need to detect new directories a StreamingListener could be used to check then stop streaming context and restart with new values.
Just thinking out loud.. If you intend to process each subdirectory once and just want to detect these new directories then potentially key off another location that may contain job info or a file token that once present could be consumed in the streaming context and call the appropriate textFile() to ingest the new path.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With