Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Bag of words with pySpark reduceByKey

I am trying to do some text mining tasks with pySpark. I am new to Spark and I've been following this example http://mccarroll.net/blog/pyspark2/index.html to build the bag of words for my data.

Originally my data looked something like this

df.show(5)
+------------+---------+----------------+--------------------+
|Title       |Month    |     Author     |            Document|
+------------+---------+----------------+--------------------+
|      a     |      Jan|     John       |This is a document  |
|      b     |      Feb|     Mary       |A book by Mary      |
|      c     |      Mar|     Luke       |Newspaper article   |
+------------+---------+----------------+--------------------+

So far I have extracted the terms of each document with

bow0 = df.rdd\
    .map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())\
    .flatMap(lambda x: x.split())\
    .map(lambda x: (x, 1))

Which gives me

[('This', 1),
 ('is', 1),
 ('a', 1),
 ('document', 1)]

But when I try to compute the frequency with reduceByKey and try to see the result

bow0.reduceByKey(lambda x,y:x+y).take(50)

I get this error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-53-966f90775397> in <module>()
----> 1 bow0.reduceByKey(lambda x,y:x+y).take(50)

/usr/local/spark/python/pyspark/rdd.py in take(self, num)
   1341 
   1342             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1343             res = self.context.runJob(self, takeUpToNumLeft, p)
   1344 
   1345             items += res

/usr/local/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    990         # SparkContext#runJob.
    991         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 992         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    993         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    994 

/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 31.0 failed 4 times, most recent failure: Lost task 1.3 in stage 31.0 (TID 84, 9.242.64.15, executor 7): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
    for k, v in iterator:
  File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:455)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 2423, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/local/spark/python/pyspark/rdd.py", line 346, in func
    return f(iterator)
  File "/usr/local/spark/python/pyspark/rdd.py", line 1842, in combineLocally
    merger.mergeValues(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 236, in mergeValues
    for k, v in iterator:
  File "<ipython-input-48-5c0753c6b152>", line 1, in <lambda>
AttributeError: 'NoneType' object has no attribute 'replace'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:404)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
like image 738
Catalina Herrera Avatar asked Oct 28 '25 02:10

Catalina Herrera


1 Answers

To expand on my comment, the error you are receiving is due to the presence of a null value in your Document column. Here's a small example to demonstrate:

data = [
    ['a', 'Jan', 'John', 'This is a document'],
    ['b', 'Feb', 'Mary', 'A book by Mary'],
    ['c', 'Mar', 'Luke', 'Newspaper article'],
    ['d', 'Apr', 'Mark', None]
]
columns = ['Title', 'Month', 'Author', 'Document']
df = spark.createDataFrame(data, columns)
df.show()
#+-----+-----+------+------------------+
#|Title|Month|Author|          Document|
#+-----+-----+------+------------------+
#|    a|  Jan|  John|This is a document|
#|    b|  Feb|  Mary|    A book by Mary|
#|    c|  Mar|  Luke| Newspaper article|
#|    d|  Apr|  Mark|              null|
#+-----+-----+------+------------------+

For the last row, the value in the Document column is null. When you compute bow0 as in your question, when the map function operates on that row it tries to call x.Document.replace where x is None. This results in AttributeError: 'NoneType' object has no attribute 'replace'.

One way to overcome this is to filter out the bad values before calling map:

bow0 = df.rdd\
    .filter(lambda x: x.Document)\
    .map( lambda x: x.Document.replace(',',' ').replace('.',' ').replace('-',' ').lower())\
    .flatMap(lambda x: x.split())\
    .map(lambda x: (x, 1))
bow0.reduceByKey(lambda x,y:x+y).take(50)
#[(u'a', 2),
# (u'this', 1),
# (u'is', 1),
# (u'newspaper', 1),
# (u'article', 1),
# (u'by', 1),
# (u'book', 1),
# (u'mary', 1),
# (u'document', 1)]

Or you can build in the check for None condition inside of your map function. In general, it is good practice to make your map function robust to bad inputs.


As an aside, you can do the same thing using the DataFrame API functions. In this case:

from pyspark.sql.functions import explode, split, regexp_replace, col, lower
df.select(explode(split(regexp_replace("Document", "[,.-]", " "), "\s+")).alias("word"))\
    .groupby(lower(col("word")).alias("lower"))\
    .count()\
    .show()
#+---------+-----+
#|    lower|count|
#+---------+-----+
#| document|    1|
#|       by|    1|
#|newspaper|    1|
#|  article|    1|
#|     mary|    1|
#|       is|    1|
#|        a|    2|
#|     this|    1|
#|     book|    1|
#+---------+-----+
like image 133
pault Avatar answered Oct 31 '25 11:10

pault