Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark DataFrame UDF on Text Column

Tags:

I'm trying to do some NLP text clean up of some Unicode columns in a PySpark DataFrame. I've tried in Spark 1.3, 1.5 and 1.6 and can't seem to get things to work for the life of me. I've also tried using Python 2.7 and Python 3.4.

I've created an extremely simple udf as seen below that should just return a string back for each record in a new column. Other functions will manipulate the text and then return the changed text back in a new column.

import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf

def dummy_function(data_str):
    cleaned_str = 'dummyData' 
    return cleaned_str

dummy_function_udf = udf(dummy_function, StringType())

Some sample data can be unzipped from here.

Here is the code I use to import the data and then apply the udf on.

# Load a text file and convert each line to a Row.
lines = sc.textFile("classified_tweets.txt")
parts = lines.map(lambda l: l.split("\t"))
training = parts.map(lambda p: (p[0], p[1]))

# Create dataframe
training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])

training_df.show(5)
+--------------------+--------------+
|               tweet|classification|
+--------------------+--------------+
|rt @jiffyclub: wi...|        python|
|rt @arnicas: ipyt...|        python|
|rt @treycausey: i...|        python|
|what's my best op...|        python|
|rt @raymondh: #py...|        python|
+--------------------+--------------+

# Apply UDF function
df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
df.show(5)

When I run the df.show(5) I get the following error. I understand that the problem most likely doesn't stem from the show() but the trace doesn't give me much help.

 ---------------------------------------------------------------------------Py4JJavaError                             Traceback (most recent call last)<ipython-input-19-0b21c233c724> in <module>()
      1 df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
----> 2 df.show(5)
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    255         +---+-----+
    256         """
--> 257         print(self._jdf.showString(n, truncate))
    258 
    259     def __repr__(self):
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()
/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    306                 raise Py4JJavaError(
    307                     "An error occurred while calling {0}{1}{2}.\n".
--> 308                     format(target_id, ".", name), value)
    309             else:
    310                 raise Py4JError(
Py4JJavaError: An error occurred while calling o474.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-12-4bc30395aac5>", line 4, in <lambda>
IndexError: list index out of range

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:913)
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Actual function I'm trying:

def tag_and_remove(data_str):
    cleaned_str = ' '
    # noun tags
    nn_tags = ['NN', 'NNP', 'NNP', 'NNPS', 'NNS']
    # adjectives
    jj_tags = ['JJ', 'JJR', 'JJS']
    # verbs
    vb_tags = ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']
    nltk_tags = nn_tags + jj_tags + vb_tags

    # break string into 'words'
    text = data_str.split()

    # tag the text and keep only those with the right tags
    tagged_text = pos_tag(text)
    for tagged_word in tagged_text:
        if tagged_word[1] in nltk_tags:
            cleaned_str += tagged_word[0] + ' '

    return cleaned_str


tag_and_remove_udf = udf(tag_and_remove, StringType())
like image 556
dreyco676 Avatar asked Jan 15 '16 03:01

dreyco676


1 Answers

Your dataset isn't clean. 985 lines split('\t') to only one value:

>>> from operator import add
>>> lines = sc.textFile("classified_tweets.txt")
>>> parts = lines.map(lambda l: l.split("\t"))
>>> parts.map(lambda l: (len(l), 1)).reduceByKey(add).collect()
[(2, 149195), (1, 985)]
>>> parts.filter(lambda l: len(l) == 1).take(5)
[['"show me the money!”  at what point do you start trying to monetize your #startup? tweet us with #startuplife.'],
 ['a good pitch can mean money in the bank for your #startup. see how body language plays a key role:  (via: ajalumnify)'],
 ['100+ apps in five years? @2359media did it using microsoft #azure:  #azureapps'],
 ['does buying better coffee make you a better leader? little things can make a big difference:  (via: @jmbrandonbb)'],
 ['.@msftventures graduates pitched\xa0#homeautomation #startups to #vcs! check out how they celebrated: ']]

So changing your code to:

>>> training = parts.filter(lambda l: len(l) == 2).map(lambda p: (p[0], p[1].strip()))
>>> training_df = sqlContext.createDataFrame(training, ["tweet", "classification"])
>>> df = training_df.withColumn("dummy", dummy_function_udf(training_df['tweet']))
>>> df.show(5)
+--------------------+--------------+---------+
|               tweet|classification|    dummy|
+--------------------+--------------+---------+
|rt @jiffyclub: wi...|        python|dummyData|
|rt @arnicas: ipyt...|        python|dummyData|
|rt @treycausey: i...|        python|dummyData|
|what's my best op...|        python|dummyData|
|rt @raymondh: #py...|        python|dummyData|
+--------------------+--------------+---------+
only showing top 5 rows
like image 97
AChampion Avatar answered Oct 07 '22 12:10

AChampion