I just built Spark on a Windows 7 machine (using sbt
) and am walking through the quick start. The Spark job is failing when calling first()
.
I am new to Java and don't have a clear idea what the error stacktrace is showing me although it appears to be related to java.net.SocketException
given the messaging. Note I am not using a Hadoop install. Also note that when running this example in Scala, there are no errors.
Environment:
Windows 7
Spark 1.2.1
Anaconda Python 2.7.8
Scala 2.10.4
sbt 0.13.7
jdk 1.7.0.75
In [2]: path = u'C:\\Users\\striji\\Documents\\Personal\\python\\pyspark-flights\\2001.csv.bz2'
In [3]: textFile = sc.textFile(path)
In [4]: textFile
Out[4]: C:\Users\striji\Documents\Personal\python\pyspark-flights\2001.csv.bz2 MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
In [5]: textFile.count()
...
Out[5]: 5967781
In [6]: textFile.first()
15/02/19 08:52:01 INFO SparkContext: Starting job: runJob at PythonRDD.scala:344
15/02/19 08:52:01 INFO DAGScheduler: Got job 1 (runJob at PythonRDD.scala:344) with 1 output partitions (allowLocal=true)
15/02/19 08:52:01 INFO DAGScheduler: Final stage: Stage 1(runJob at PythonRDD.scala:344)
15/02/19 08:52:01 INFO DAGScheduler: Parents of final stage: List()
15/02/19 08:52:01 INFO DAGScheduler: Missing parents: List()
15/02/19 08:52:01 INFO DAGScheduler: Submitting Stage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43), which has no missing parents
15/02/19 08:52:01 INFO MemoryStore: ensureFreeSpace(4560) called with curMem=46832, maxMem=278302556
15/02/19 08:52:01 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.5 KB, free 265.4 MB)
15/02/19 08:52:01 INFO MemoryStore: ensureFreeSpace(3417) called with curMem=51392, maxMem=278302556
15/02/19 08:52:01 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.3 KB, free 265.4 MB)
15/02/19 08:52:01 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:51106 (size: 3.3 KB, free: 265.4 MB)
15/02/19 08:52:01 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0
15/02/19 08:52:01 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838
15/02/19 08:52:01 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (PythonRDD[3] at RDD at PythonRDD.scala:43)
15/02/19 08:52:01 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
15/02/19 08:52:01 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1341 bytes)
15/02/19 08:52:01 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/02/19 08:52:04 INFO HadoopRDD: Input split: file:/C:/Users/striji/Documents/Personal/python/pyspark-flights/2001.csv.bz2:0+83478700
15/02/19 08:52:04 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/19 08:52:05 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/19 08:52:05 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
15/02/19 08:52:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/02/19 08:52:05 INFO TaskSchedulerImpl: Cancelling stage 1
15/02/19 08:52:05 INFO DAGScheduler: Job 1 failed: runJob at PythonRDD.scala:344, took 3.796728 s
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-674a86098a8f> in <module>()
----> 1 textFile.first()
c:\spark-1.2.1\python\pyspark\rdd.pyc in first(self)
1137 ValueError: RDD is empty
1138 """
-> 1139 rs = self.take(1)
1140 if rs:
1141 return rs[0]
c:\spark-1.2.1\python\pyspark\rdd.pyc in take(self, num)
1119
1120 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1121 res = self.context.runJob(self, takeUpToNumLeft, p, True)
1122
1123 items += res
c:\spark-1.2.1\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
825 # SparkContext#runJob.
826 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 827 it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
828 return list(mappedRDD._collect_iterator_through_file(it))
829
c:\spark-1.2.1\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:
c:\spark-1.2.1\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0
in stage 1.0 (TID 1, localhost): java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:196)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:110)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1
214)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
In Spark, stage failures happen when there's a problem with processing a Spark task. These failures can be caused by hardware issues, incorrect Spark configurations, or code problems. When a stage failure occurs, the Spark driver logs report an exception similar to the following: org.
Py4JJavaError(msg, java_exception) Exception raised when an exception occurs in the client code.
According to the error it is because your RDD is empty.
You are calling first()
on something that does not exist.
Try this pyspark
example:
People=["1,Maj,123","2,Pvt,333","3,Col,999"]
rrd1=sc.parallelize(People)
rrd1.first()
It should output:
'1,Maj,123'
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With