I use spark 1.3.1 and Python 2.7
It is my first experience with Spark Streaming.
I try example of code, which reads data from file using spark streaming.
This is link to example: https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/hdfs_wordcount.py
My code is the following:
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 1)
lines = ssc.textFileStream('../inputs/2.txt')
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda x: (x, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
content of 2.txt file is following:
a1 b1 c1 d1 e1 f1 g1 a2 b2 c2 d2 e2 f2 g2 a3 b3 c3 d3 e3 f3 g3
I expect that something related to file content will be in console, but there are nothing. Nothing except text like this each second:
------------------------------------------- Time: 2015-09-03 15:08:18 -------------------------------------------
and Spark's logs.
Do I do some thing wrong? Otherwise why it does not work?
I faced similar issue but what I realized is that once I set the Streaming running, streamingcontext picks up the data from new files. It only ingests data newly placed in the source directory once the streaming is up.
Actually, pyspark document makes it very explicit:
textFileStream(directory)
Create an input stream that monitors a Hadoop-compatible file system for new files and reads them as text files. Files must be wrriten to the monitored directory by “moving” them from another location within the same file system. File names starting with . are ignored.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With