Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark: PicklingError: Could not serialize object:

I have the following two data frames: df_whitelist and df_text

+-------+--------------------+
|keyword|    whitelist_terms |
+-------+--------------------+
|     LA|             LA city|
|     LA|        US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
|                Text|  Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+

In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to “LA city” and “US LA in da”. In df_text, I have text and some keywords found in this text. What I want to do is that for each piece of text, such as “the client has ada..”, for each of its keywords “client” and “ada”, check through all the whitelist terms for that keyword, to see how many times the term occurred in the text. what I have tried is like following:

import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
    keywords = listOfKeyword.split(";")
    found_whiteterms_count = 0
    for k in keywords:
        if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
            found_whiteterms_count = found_whiteterms_count + 0
        else:
            df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
            n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
            found_whiteterms_count = found_whiteterms_count + n    
    return found_whiteterms_count     
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))

and I got the error:

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)

I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.

like image 916
user1740987 Avatar asked Nov 12 '17 13:11

user1740987


1 Answers

You are passing a pyspark dataframe, df_whitelist to a UDF, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.

What you need to do instead, is to join the two dataframes on keyword. Let's start with the two sample dataframes you provided:

df_whitelist = spark.createDataFrame(
    [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]], 
    ["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
    [["the client as ada", "client;ada"], ["this client has l", "client;LA"]], 
    ["Text", "Keywords"])

Column Keywords in df_text needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:

import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

    +-----------------+-------+
    |             Text|keyword|
    +-----------------+-------+
    |the client as ada| client|
    |the client as ada|    ada|
    |this client has l| client|
    |this client has l|     LA|
    +-----------------+-------+

Now we can join the two data frames on keyword:

df = df_text.join(df_whitelist, "keyword", "leftouter")

    +-------+-----------------+-----------------+
    |keyword|             Text|  whitelist_terms|
    +-------+-----------------+-----------------+
    |     LA|this client has l|          LA city|
    |     LA|this client has l|      US LA in da|
    |    ada|the client as ada|             null|
    | client|the client as ada|this client has i|
    | client|the client as ada|       our client|
    | client|this client has l|this client has i|
    | client|this client has l|       our client|
    +-------+-----------------+-----------------+
  • The first condition you invoke in your UDF can be translated as follows: if keyword in df_text is not present in df_whitelist then 0. It is equivalent to saying the value for df_whitelist columns are going to be NULL in the left join since they only appear in the left data frame

  • The second condition: you count the number of times whitelist_terms appear in Text: Text.count(whitelist_terms)

We'll write a UDF to do this:

from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
    "Text", 
    "keyword", 
    F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

    +-----------------+-------+----------------+
    |             Text|keyword|whitelist_counts|
    +-----------------+-------+----------------+
    |this client has l|     LA|               0|
    |this client has l|     LA|               0|
    |the client as ada|    ada|               0|
    |the client as ada| client|               0|
    |the client as ada| client|               0|
    |this client has l| client|               0|
    |this client has l| client|               0|
    +-----------------+-------+----------------+

Finally we can aggregate to get back to a dataframe with only distinct Text:

res = df.groupBy("Text").agg(
    F.collect_set("keyword").alias("Keywords"),
    F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()

    +-----------------+-------------+----------------+
    |             Text|     Keywords|whitelist_counts|
    +-----------------+-------------+----------------+
    |this client has l| [client, LA]|               0|
    |the client as ada|[ada, client]|               0|
    +-----------------+-------------+----------------+
like image 174
MaFF Avatar answered Oct 18 '22 16:10

MaFF