Here's my code:
# map function
def echo(lines):
if lines:
for i, line in enumerate(lines):
print i, "=>", line
# load the data
idsFile = "ids.txt" # Should be some file on your system
linesData = sc.textFile(idsFile).cache()
# clean it
cleanLinesData = linesData.map(lambda line: line.strip())
filteredLinesData = cleanLinesData.filter(lambda line: True if line else False)
# setup task
answers = filteredLinesData.mapPartitions(echo, 2) # split file into 2 pieces
And the file ids.txt
:
1,2,3,4,5,1
6,4,8,3,2
9,9,9,9,9,9
100000000,1
10,10,10
1,2,4,2
To run it, I run:
$ IPYTHON=1 pyspark --master local[2]
And then I %cpaste
the code in (is there a better way to do that?).
If I simply try to take()
and see the values, I get a sensible output:
In[2]: filteredLinesData.take(6)
Out[2]:
[u'1,2,3,4,5,1',
u'6,4,8,3,2',
u'9,9,9,9,9,9',
u'100000000,1',
u'10,10,10',
u'1,2,4,2']
But when I try to actually execute the setup mapPartitions()
job, it fails:
In [3]: executed = answers.collect()
14/09/12 11:18:22 INFO SparkContext: Starting job: collect at <ipython-input-3-6461aec48699>:1
14/09/12 11:18:22 INFO DAGScheduler: Got job 2 (collect at <ipython-input-3-6461aec48699>:1) with 2 output partitions (allowLocal=false)
14/09/12 11:18:22 INFO DAGScheduler: Final stage: Stage 2(collect at <ipython-input-3-6461aec48699>:1)
14/09/12 11:18:22 INFO DAGScheduler: Parents of final stage: List()
14/09/12 11:18:22 INFO DAGScheduler: Missing parents: List()
14/09/12 11:18:22 INFO DAGScheduler: Submitting Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37), which has no missing parents
14/09/12 11:18:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37)
14/09/12 11:18:22 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:0 as 3112 bytes in 1 ms
14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:1 as 3112 bytes in 0 ms
14/09/12 11:18:22 INFO Executor: Running task ID 0
14/09/12 11:18:22 INFO Executor: Running task ID 1
14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally
14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally
14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_1 not found, computing it
14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_0 not found, computing it
14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:31+32
14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:0+31
14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(288) called with curMem=133256, maxMem=308910489
14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_1 stored as values to memory (estimated size 288.0 B, free 294.5 MB)
14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(304) called with curMem=133544, maxMem=308910489
14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_0 stored as values to memory (estimated size 304.0 B, free 294.5 MB)
14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_0 in memory on 18.111.61.9:58306 (size: 304.0 B, free: 294.6 MB)
14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_1 in memory on 18.111.61.9:58306 (size: 288.0 B, free: 294.6 MB)
14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_0
14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_1
0 => 1,2,3,4,5,1
1 => 6,4,8,3,2
2 => 9,9,9,9,9,9
0 => 100000000,1
1 => PySpark worker failed with exception:10,10,10
2 => 1,2,4,2
PySpark worker failed with exception:
Traceback (most recent call last):
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
for obj in iterator:
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
for item in iterator:
TypeError: 'NoneType' object is not iterable
Traceback (most recent call last):
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
for obj in iterator:
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
for item in iterator:
TypeError: 'NoneType' object is not iterable
14/09/12 11:18:22 ERROR Executor: Exception in task ID 1
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
for obj in iterator:
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
for item in iterator:
TypeError: 'NoneType' object is not iterable
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 ERROR Executor: Exception in task ID 0
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
for obj in iterator:
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
for item in iterator:
TypeError: 'NoneType' object is not iterable
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 WARN TaskSetManager: Lost TID 0 (task 2.0:0)
14/09/12 11:18:22 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
for obj in iterator:
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
for item in iterator:
TypeError: 'NoneType' object is not iterable
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 ERROR TaskSetManager: Task 2.0:0 failed 1 times; aborting job
14/09/12 11:18:22 INFO TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
for obj in iterator:
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
for item in iterator:
TypeError: 'NoneType' object is not iterable
[duplicate 1]
14/09/12 11:18:22 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
14/09/12 11:18:22 INFO DAGScheduler: Failed to run collect at <ipython-input-3-6461aec48699>:1
14/09/12 11:18:22 INFO TaskSchedulerImpl: Cancelling stage 2
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-3-6461aec48699> in <module>()
----> 1 executed = answers.collect()
/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/rdd.pyc in collect(self)
581 """
582 with _JavaStackTrace(self.context) as st:
--> 583 bytesInJava = self._jrdd.collect().iterator()
584 return list(self._collect_iterator_through_file(bytesInJava))
585
/usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
535 answer = self.gateway_client.send_command(command)
536 return_value = get_return_value(answer, self.gateway_client,
--> 537 self.target_id, self.name)
538
539 for temp_arg in temp_args:
/usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(
Py4JJavaError: An error occurred while calling o38.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
for obj in iterator:
File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
for item in iterator:
TypeError: 'NoneType' object is not iterable
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
It's clearly doing the right thing for the inputs, but for some reason at the end Spark is trying to feed the mapPartition()
functions something which is causing a TypeError: 'NoneType' object is not iterable
error.
Any ideas what I'm doing wrong? I'm completely new to spark.
The problem here is that mapPartitions
accepts a function that returns an iterable object, such as a list or generator. Your echo
function implicitly returns None
, which is why PySpark is complaining about object NoneType is not iterable
.
As Jonathan suggested, you could use this function (unmodified, actually) with foreachPartition
. I believe that this will print your intended output when running PySpark in local
mode, but it might not be what you want when you deploy on a cluster: the output from your print
statements will be printed in the worker's logs, not displayed on the driver.
Instead, I'd modify your echo
function to yield (i, "=>", line)
; now, the function's return type should be a generator.
A couple problems:
First, to run the script from within ipython, you can use execfile() or %run.
Second, mapPartitions doesn't take a number of partitions parameter; perhaps it did at some point? You can use parallelize to explicitly set the number of partitions.
If you run it like that, you'll get output as you expect, but errors of the form:
TypeError: 'NoneType' object is not iterable
That's because mapPartition is a transformation; it expects a function which will take the partition of an RDD and return a new partition of an RDD. You are outputting something as a side effect, but not returning a new RDD partition. That is, you're looking for an action, not a transformation. foreach acts on each element; foreachPartition works partition by partition, but expects a generator which returns None:
# map function
def echo(lines):
if lines:
for i, line in enumerate(lines):
print i, "=>", line
yield None
# load the data
idsFile = "ids.txt" # Should be some file on your system
linesData = sc.textFile(idsFile).cache()
# clean it
cleanLinesData = linesData.map(lambda line: line.strip())
filteredLinesData = cleanLinesData.filter(lambda line: True if line else False)
# setup task
answers = filteredLinesData.foreachPartition(echo)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With