Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark streaming network_wordcount.py does not print result

I start the ncat console and then submit the simple sample application network_wordcount.py.

When I enter words via the ncat console then I see the following output:

...
15/01/20 16:55:19 INFO scheduler.JobScheduler: Added jobs for time 1421769319000 ms
15/01/20 16:55:19 INFO storage.MemoryStore: ensureFreeSpace(12) called with curMem=65052, maxMem=280248975
15/01/20 16:55:19 INFO storage.MemoryStore: Block input-0-1421769319200 stored as bytes in memory (estimated size 12.0 B, free 267.2 MB)
15/01/20 16:55:19 INFO storage.BlockManagerInfo: Added input-0-1421769319200 in memory on localhost:34754 (size: 12.0 B, free: 267.2 MB)
15/01/20 16:55:19 INFO storage.BlockManagerMaster: Updated info of block input-0-1421769319200 
15/01/20 16:55:19 WARN storage.BlockManager: Block input-0-1421769319200 replicated to only 0 peer(s) instead of 1 peers
15/01/20 16:55:19 INFO receiver.BlockGenerator: Pushed block input-0-1421769319200
...

So spark seems to receive something but it does NOT print the expected result such as (hello,1).

Any help is welcome.

Regards, Felix

PS: I am using CENTOS I have to call ncat as follows in order that Spark receives any data: ncat -lkv6 9999

Here the contents of the log:

15/01/20 16:55:08 INFO spark.SecurityManager: Changing view acls to: root
15/01/20 16:55:08 INFO spark.SecurityManager: Changing modify acls to: root
15/01/20 16:55:08 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: 
Set(root); users with modify permissions: Set(root)
15/01/20 16:55:09 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/01/20 16:55:09 INFO Remoting: Starting remoting
15/01/20 16:55:09 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:54048]
15/01/20 16:55:09 INFO util.Utils: Successfully started service 'sparkDriver' on port 54048.
15/01/20 16:55:09 INFO spark.SparkEnv: Registering MapOutputTracker
15/01/20 16:55:09 INFO spark.SparkEnv: Registering BlockManagerMaster
15/01/20 16:55:10 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150120165510-b6c3
15/01/20 16:55:10 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB
15/01/20 16:55:10 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where 
applicable
15/01/20 16:55:10 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-51e3b95d-8b3d-4eed-8571-b38205b7ba9c
15/01/20 16:55:10 INFO spark.HttpServer: Starting HTTP Server
15/01/20 16:55:11 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/01/20 16:55:11 INFO server.AbstractConnector: Started [email protected]:37603
15/01/20 16:55:11 INFO util.Utils: Successfully started service 'HTTP file server' on port 37603.
15/01/20 16:55:11 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/01/20 16:55:11 INFO server.AbstractConnector: Started [email protected]:4040
15/01/20 16:55:11 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
15/01/20 16:55:11 INFO ui.SparkUI: Started SparkUI at [this is not a link]h_t_t_p://srv-lab-t-734.zhaw.ch:4040
15/01/20 16:55:11 INFO util.Utils: Copying /root/tmp/spark/spark-1.2.0/examples/src/main/python/streaming/network_wordcount.py to 
/tmp/spark-c048d2f8-8ea8-4a13-a160-758c2875abec/network_wordcount.py
15/01/20 16:55:11 INFO spark.SparkContext: Added file 
file:/root/tmp/spark/spark-1.2.0/examples/src/main/python/streaming/network_wordcount.py at [this is not a 
link]http://160.85.30.108:37603/files/network_wordcount.py with timestamp 1421769311787
15/01/20 16:55:12 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: 
akka.tcp://[email protected]:54048/user/HeartbeatReceiver
15/01/20 16:55:12 INFO netty.NettyBlockTransferService: Server created on 34754
15/01/20 16:55:12 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/01/20 16:55:12 INFO storage.BlockManagerMasterActor: Registering block manager localhost:34754 with 267.3 MB RAM, 
BlockManagerId(<driver>, localhost, 34754)
15/01/20 16:55:12 INFO storage.BlockManagerMaster: Registered BlockManager
15/01/20 16:55:12 INFO scheduler.ReceiverTracker: ReceiverTracker started
15/01/20 16:55:13 INFO dstream.ForEachDStream: metadataCleanupDelay = -1
15/01/20 16:55:13 INFO python.PythonTransformedDStream: metadataCleanupDelay = -1
15/01/20 16:55:13 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Slide time = 1000 ms
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Checkpoint interval = null
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Remember duration = 1000 ms
15/01/20 16:55:13 INFO dstream.SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@162df93d
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Slide time = 1000 ms
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Checkpoint interval = null
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Remember duration = 1000 ms
15/01/20 16:55:13 INFO python.PythonTransformedDStream: Initialized and validated 
org.apache.spark.streaming.api.python.PythonTransformedDStream@24461f2b
15/01/20 16:55:13 INFO dstream.ForEachDStream: Slide time = 1000 ms
15/01/20 16:55:13 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
15/01/20 16:55:13 INFO dstream.ForEachDStream: Checkpoint interval = null
15/01/20 16:55:13 INFO dstream.ForEachDStream: Remember duration = 1000 ms
15/01/20 16:55:13 INFO dstream.ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@77a852a7
15/01/20 16:55:13 INFO scheduler.ReceiverTracker: Starting 1 receivers
15/01/20 16:55:13 INFO util.RecurringTimer: Started timer for JobGenerator at time 1421769314000
15/01/20 16:55:13 INFO scheduler.JobGenerator: Started JobGenerator at 1421769314000 ms
15/01/20 16:55:13 INFO scheduler.JobScheduler: Started JobScheduler
15/01/20 16:55:13 INFO spark.SparkContext: Starting job: start at NativeMethodAccessorImpl.java:-2
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Got job 0 (start at NativeMethodAccessorImpl.java:-2) with 1 output partitions 
(allowLocal=false)
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Final stage: Stage 0(start at NativeMethodAccessorImpl.java:-2)
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Missing parents: List()
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at start at NativeMethodAccessorImpl.java:-2), 
which has no missing parents
15/01/20 16:55:13 INFO storage.MemoryStore: ensureFreeSpace(35112) called with curMem=0, maxMem=280248975
15/01/20 16:55:13 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 34.3 KB, free 267.2 MB)
15/01/20 16:55:13 INFO storage.MemoryStore: ensureFreeSpace(19994) called with curMem=35112, maxMem=280248975
15/01/20 16:55:13 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 19.5 KB, free 267.2 MB)
15/01/20 16:55:13 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:34754 (size: 19.5 KB, free: 267.2 MB)
15/01/20 16:55:13 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0
15/01/20 16:55:13 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:838
15/01/20 16:55:13 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (ParallelCollectionRDD[0] at start at 
NativeMethodAccessorImpl.java:-2)
15/01/20 16:55:13 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/01/20 16:55:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1937 bytes)
15/01/20 16:55:13 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
15/01/20 16:55:13 INFO executor.Executor: Fetching [this is not a link]h_t_t_p://160.85.30.108:37603/files/network_wordcount.py with 
timestamp 1421769311787
15/01/20 16:55:13 INFO util.Utils: Fetching [this is not a link]h_t_t_p://160.85.30.108:37603/files/network_wordcount.py to 
/tmp/fetchFileTemp2647961966239862063.tmp
15/01/20 16:55:13 INFO receiver.ReceiverSupervisorImpl: Registered receiver 0
15/01/20 16:55:13 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver
15/01/20 16:55:13 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1421769314000
15/01/20 16:55:13 INFO receiver.BlockGenerator: Started BlockGenerator
15/01/20 16:55:13 INFO receiver.ReceiverSupervisorImpl: Starting receiver
15/01/20 16:55:13 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
15/01/20 16:55:13 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver
15/01/20 16:55:13 INFO receiver.BlockGenerator: Started block pushing thread
15/01/20 16:55:13 INFO dstream.SocketReceiver: Connecting to localhost:9993
15/01/20 16:55:13 INFO dstream.SocketReceiver: Connected to localhost:9993
15/01/20 16:55:14 INFO scheduler.JobScheduler: Added jobs for time 1421769314000 ms
15/01/20 16:55:14 INFO scheduler.JobScheduler: Starting job streaming job 1421769314000 ms.0 from job set of time 1421769314000 ms
15/01/20 16:55:14 INFO spark.SparkContext: Starting job: runJob at PythonRDD.scala:344
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Registering RDD 3 (call at 
/root/tmp/spark/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py:1206)
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Got job 1 (runJob at PythonRDD.scala:344) with 1 output partitions (allowLocal=true)
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Final stage: Stage 2(runJob at PythonRDD.scala:344)
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Missing parents: List()
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Submitting Stage 2 (PythonRDD[7] at RDD at PythonRDD.scala:43), which has no missing parents
15/01/20 16:55:14 INFO storage.MemoryStore: ensureFreeSpace(5696) called with curMem=55106, maxMem=280248975
15/01/20 16:55:14 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.6 KB, free 267.2 MB)
15/01/20 16:55:14 INFO storage.MemoryStore: ensureFreeSpace(4250) called with curMem=60802, maxMem=280248975
15/01/20 16:55:14 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.2 KB, free 267.2 MB)
15/01/20 16:55:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:34754 (size: 4.2 KB, free: 267.2 MB)
15/01/20 16:55:14 INFO storage.BlockManagerMaster: Updated info of block broadcast_1_piece0
15/01/20 16:55:14 INFO spark.SparkContext: Created broadcast 1 from getCallSite at DStream.scala:294
15/01/20 16:55:14 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 2 (PythonRDD[7] at RDD at PythonRDD.scala:43)
15/01/20 16:55:14 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
15/01/20 16:55:15 INFO scheduler.JobScheduler: Added jobs for time 1421769315000 ms
15/01/20 16:55:16 INFO scheduler.JobScheduler: Added jobs for time 1421769316000 ms
15/01/20 16:55:17 INFO scheduler.JobScheduler: Added jobs for time 1421769317000 ms
15/01/20 16:55:18 INFO scheduler.JobScheduler: Added jobs for time 1421769318000 ms
15/01/20 16:55:19 INFO scheduler.JobScheduler: Added jobs for time 1421769319000 ms
15/01/20 16:55:19 INFO storage.MemoryStore: ensureFreeSpace(12) called with curMem=65052, maxMem=280248975
15/01/20 16:55:19 INFO storage.MemoryStore: Block input-0-1421769319200 stored as bytes in memory (estimated size 12.0 B, free 267.2 MB)
15/01/20 16:55:19 INFO storage.BlockManagerInfo: Added input-0-1421769319200 in memory on localhost:34754 (size: 12.0 B, free: 267.2 MB)
15/01/20 16:55:19 INFO storage.BlockManagerMaster: Updated info of block input-0-1421769319200
15/01/20 16:55:19 WARN storage.BlockManager: Block input-0-1421769319200 replicated to only 0 peer(s) instead of 1 peers
15/01/20 16:55:19 INFO receiver.BlockGenerator: Pushed block input-0-1421769319200
15/01/20 16:55:20 INFO scheduler.JobScheduler: Added jobs for time 1421769320000 ms
15/01/20 16:55:21 INFO scheduler.JobScheduler: Added jobs for time 1421769321000 ms
15/01/20 16:55:22 INFO scheduler.JobScheduler: Added jobs for time 1421769322000 ms
like image 782
Felix Mueller Avatar asked Jan 20 '15 16:01

Felix Mueller


People also ask

How does Spark read Streaming data?

Use readStream. format("socket") from Spark session object to read data from the socket and provide options host and port where you want to stream data from.

What is checkpointing in Spark Streaming?

What is Spark Streaming Checkpoint. A process of writing received records at checkpoint intervals to HDFS is checkpointing. It is a requirement that streaming application must operate 24/7. Hence, must be resilient to failures unrelated to the application logic such as system failures, JVM crashes, etc.

Do Spark Streaming programs run continuously?

Users specify a streaming computation by writing a batch computation (using Spark's DataFrame/Dataset API), and the engine automatically incrementalizes this computation (runs it continuously).

What is the use of saveAsObjectFiles () operation on Dstreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.


1 Answers

I think you should specify more executors while submitting the application. For example:

spark-submit --master local[4] your_file.py

From Learning Spark chapter 10:

Do not run Spark Streaming programs locally with master configured as local or local[1]. This allocates only one CPU for tasks and if a receiver is running on it, there is no resource left to process the received data. Use at least local[2] to have more cores.

like image 170
pzecevic Avatar answered Jan 03 '23 20:01

pzecevic