Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

simple mapping partitions job in (py)spark

Here's my code:

# map function
def echo(lines):
    if lines:
        for i, line in enumerate(lines):
            print i, "=>", line

# load the data
idsFile = "ids.txt"  # Should be some file on your system
linesData = sc.textFile(idsFile).cache()

# clean it
cleanLinesData = linesData.map(lambda line: line.strip())
filteredLinesData = cleanLinesData.filter(lambda line: True if line else False)

# setup task
answers = filteredLinesData.mapPartitions(echo, 2)  # split file into 2 pieces

And the file ids.txt:

1,2,3,4,5,1
6,4,8,3,2
9,9,9,9,9,9
100000000,1
10,10,10
1,2,4,2

To run it, I run:

$ IPYTHON=1 pyspark --master local[2]

And then I %cpaste the code in (is there a better way to do that?).

If I simply try to take() and see the values, I get a sensible output:

In[2]: filteredLinesData.take(6)
Out[2]:
[u'1,2,3,4,5,1',
 u'6,4,8,3,2',
 u'9,9,9,9,9,9',
 u'100000000,1',
 u'10,10,10',
 u'1,2,4,2']

But when I try to actually execute the setup mapPartitions() job, it fails:

In [3]: executed = answers.collect()
14/09/12 11:18:22 INFO SparkContext: Starting job: collect at <ipython-input-3-6461aec48699>:1
14/09/12 11:18:22 INFO DAGScheduler: Got job 2 (collect at <ipython-input-3-6461aec48699>:1) with 2 output partitions (allowLocal=false)
14/09/12 11:18:22 INFO DAGScheduler: Final stage: Stage 2(collect at <ipython-input-3-6461aec48699>:1)
14/09/12 11:18:22 INFO DAGScheduler: Parents of final stage: List()
14/09/12 11:18:22 INFO DAGScheduler: Missing parents: List()
14/09/12 11:18:22 INFO DAGScheduler: Submitting Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37), which has no missing parents
14/09/12 11:18:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37)
14/09/12 11:18:22 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:0 as 3112 bytes in 1 ms
14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:1 as 3112 bytes in 0 ms
14/09/12 11:18:22 INFO Executor: Running task ID 0
14/09/12 11:18:22 INFO Executor: Running task ID 1
14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally
14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally
14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_1 not found, computing it
14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_0 not found, computing it
14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:31+32
14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:0+31
14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(288) called with curMem=133256, maxMem=308910489
14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_1 stored as values to memory (estimated size 288.0 B, free 294.5 MB)
14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(304) called with curMem=133544, maxMem=308910489
14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_0 stored as values to memory (estimated size 304.0 B, free 294.5 MB)
14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_0 in memory on 18.111.61.9:58306 (size: 304.0 B, free: 294.6 MB)
14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_1 in memory on 18.111.61.9:58306 (size: 288.0 B, free: 294.6 MB)
14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_0
14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_1
0 => 1,2,3,4,5,1
1 => 6,4,8,3,2
2 => 9,9,9,9,9,9
0 => 100000000,1
1 => PySpark worker failed with exception:10,10,10

2 => 1,2,4,2
PySpark worker failed with exception:
Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

14/09/12 11:18:22 ERROR Executor: Exception in task ID 1
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 ERROR Executor: Exception in task ID 0
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 WARN TaskSetManager: Lost TID 0 (task 2.0:0)
14/09/12 11:18:22 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
14/09/12 11:18:22 ERROR TaskSetManager: Task 2.0:0 failed 1 times; aborting job
14/09/12 11:18:22 INFO TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable
 [duplicate 1]
14/09/12 11:18:22 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
14/09/12 11:18:22 INFO DAGScheduler: Failed to run collect at <ipython-input-3-6461aec48699>:1
14/09/12 11:18:22 INFO TaskSchedulerImpl: Cancelling stage 2
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-6461aec48699> in <module>()
----> 1 executed = answers.collect()

/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/rdd.pyc in collect(self)
    581         """
    582         with _JavaStackTrace(self.context) as st:
--> 583           bytesInJava = self._jrdd.collect().iterator()
    584         return list(self._collect_iterator_through_file(bytesInJava))
    585 

/usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    535         answer = self.gateway_client.send_command(command)
    536         return_value = get_return_value(answer, self.gateway_client,
--> 537                 self.target_id, self.name)
    538 
    539         for temp_arg in temp_args:

/usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o38.collect.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream
    for obj in iterator:
  File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched
    for item in iterator:
TypeError: 'NoneType' object is not iterable

        org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115)
        org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
        org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
        org.apache.spark.scheduler.Task.run(Task.scala:51)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

It's clearly doing the right thing for the inputs, but for some reason at the end Spark is trying to feed the mapPartition() functions something which is causing a TypeError: 'NoneType' object is not iterable error.

Any ideas what I'm doing wrong? I'm completely new to spark.

like image 386
lollercoaster Avatar asked Sep 12 '14 15:09

lollercoaster


2 Answers

The problem here is that mapPartitions accepts a function that returns an iterable object, such as a list or generator. Your echo function implicitly returns None, which is why PySpark is complaining about object NoneType is not iterable.

As Jonathan suggested, you could use this function (unmodified, actually) with foreachPartition. I believe that this will print your intended output when running PySpark in local mode, but it might not be what you want when you deploy on a cluster: the output from your print statements will be printed in the worker's logs, not displayed on the driver.

Instead, I'd modify your echo function to yield (i, "=>", line); now, the function's return type should be a generator.

like image 24
Josh Rosen Avatar answered Nov 19 '22 07:11

Josh Rosen


A couple problems:

First, to run the script from within ipython, you can use execfile() or %run.

Second, mapPartitions doesn't take a number of partitions parameter; perhaps it did at some point? You can use parallelize to explicitly set the number of partitions.

If you run it like that, you'll get output as you expect, but errors of the form:

TypeError: 'NoneType' object is not iterable

That's because mapPartition is a transformation; it expects a function which will take the partition of an RDD and return a new partition of an RDD. You are outputting something as a side effect, but not returning a new RDD partition. That is, you're looking for an action, not a transformation. foreach acts on each element; foreachPartition works partition by partition, but expects a generator which returns None:

# map function
def echo(lines):
    if lines:
        for i, line in enumerate(lines):
            print i, "=>", line
    yield None

# load the data
idsFile = "ids.txt"  # Should be some file on your system
linesData = sc.textFile(idsFile).cache()

# clean it
cleanLinesData = linesData.map(lambda line: line.strip())
filteredLinesData = cleanLinesData.filter(lambda line: True if line else False)

# setup task
answers = filteredLinesData.foreachPartition(echo)
like image 89
Jonathan Dursi Avatar answered Nov 19 '22 07:11

Jonathan Dursi