I'm new in Spark Streaming and I'm trying to getting started with it using Spark-shell. Assuming I have a directory called "dataTest" placed in the root directory of spark-1.2.0-bin-hadoop2.4.
The simple code that I want to test in the shell is (after typing $.\bin\spark-shell):
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(2))
val data = ssc.textFileStream("dataTest")
println("Nb lines is equal to= "+data.count())
data.foreachRDD { (rdd, time) => println(rdd.count()) }
ssc.start()
ssc.awaitTermination()
And then, I copy some files in the directory "dataTest" (and also I tried to rename some existing files in this directory).
But unfortunately I did not get what I want (i.e, I didn't get any outpout, so it seems like ssc.textFileStream doesn't work correctly), just some things like:
15/01/15 19:32:46 INFO JobScheduler: Added jobs for time 1421346766000 ms
15/01/15 19:32:46 INFO JobScheduler: Starting job streaming job 1421346766000 ms
.0 from job set of time 1421346766000 ms
15/01/15 19:32:46 INFO SparkContext: Starting job: foreachRDD at <console>:20
15/01/15 19:32:46 INFO DAGScheduler: Job 69 finished: foreachRDD at <console>:20
, took 0,000021 s
0
15/01/15 19:32:46 INFO JobScheduler: Finished job streaming job 1421346766000 ms
.0 from job set of time 1421346766000 ms
15/01/15 19:32:46 INFO MappedRDD: Removing RDD 137 from persistence list
15/01/15 19:32:46 INFO JobScheduler: Total delay: 0,005 s for time 1421346766000
ms (execution: 0,002 s)
15/01/15 19:32:46 INFO BlockManager: Removing RDD 137
15/01/15 19:32:46 INFO UnionRDD: Removing RDD 78 from persistence list
15/01/15 19:32:46 INFO BlockManager: Removing RDD 78
15/01/15 19:32:46 INFO FileInputDStream: Cleared 1 old files that were older tha
n 1421346706000 ms: 1421346704000 ms
15/01/15 19:32:46 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
In Spark Streaming divide the data stream into batches called DStreams, which internally is a sequence of RDDs. The RDDs process using Spark APIs, and the results return in batches. Spark Streaming provides an API in Scala, Java, and Python.
Now that the Direct API of Spark Streaming (we currently have version 2.3. 2) is deprecated and we recently added the Confluent platform (comes with Kafka 2.2. 0) to our project we plan to migrate these applications.
socketTextStream(...) in the quick example which creates a DStream from text data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides methods for creating DStreams from files and Akka actors as input sources.
Spark Streaming is an integral part of Spark core API to perform real-time data analytics. It allows us to build a scalable, high-throughput, and fault-tolerant streaming application of live data streams.
Did you try moving text files from another directory into the directory that is being monitored? For file stream to work, you are atomically put the files into the monitored directory, so that as soon as the files becomes visible in the listings, Spark can read all the data in the file (which may not be the case if you are copying files into the directory).
This is well documented in the Basic sources subsection in the programming guide
Copy file/document Using command line or save as the file/document to the directory work for me. When you normally copy(by IDE) this can't effect the modified date as streaming context monitor modified date.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With