Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS Comprehend + Pyspark UDF = Error: can't pickle SSLContext objects

When applying a Pyspark UDF that calls an AWS API, I get the error

PicklingError: Could not serialize object: TypeError: can't pickle SSLContext objects

The code is

import pyspark.sql.functions as sqlf
import boto3

comprehend = boto3.client('comprehend', region_name='us-east-1')

def detect_sentiment(text):
  response = comprehend.detect_sentiment(Text=text, LanguageCode='pt')
  return response["SentimentScore"]["Positive"]

detect_sentiment_udf = sqlf.udf(detect_sentiment)

test = df.withColumn("Positive", detect_sentiment_udf(df.Conversa))

Where df.Conversa contains short simple strings. Please, how can I solve this? Or what could be an alternative approach?

like image 464
Rafael Ferro Avatar asked Sep 14 '25 11:09

Rafael Ferro


1 Answers

When your udf is called, it receives the entire context, and this context needs to be serializable. The boto client is NOT serializable, so you need to create it within your udf call.

If you are using an object's method as udf, such as below, you will get the same error. To fix it, add a property for the client.

class Foo:
    def __init__(self):
        # this will generate an error when udf is called
        self.client = boto3.client('comprehend', region_name='us-east-1')

    # do this instead
    @property
    def client(self):
        return boto3.client('comprehend', region_name='us-east-1')

    def my_udf(self, text):
        response = self.client.detect_sentiment(Text=text, LanguageCode='pt')
        return response["SentimentScore"]["Positive"]

    def add_sentiment_column(self, df):
        detect_sentiment_udf = sqlf.udf(self.my_udf)
        return df.withColumn("Positive", detect_sentiment_udf(df.Conversa))

@johnhill2424's answer will fix the problem in your case:

import pyspark.sql.functions as sqlf
import boto3

def detect_sentiment(text):
  comprehend = boto3.client('comprehend', region_name='us-east-1')
  response = comprehend.detect_sentiment(Text=text, LanguageCode='pt')
  return response["SentimentScore"]["Positive"]

detect_sentiment_udf = sqlf.udf(detect_sentiment)

test = df.withColumn("Positive", detect_sentiment_udf(df.Conversa))
like image 161
Daniel R Carletti Avatar answered Sep 17 '25 01:09

Daniel R Carletti