Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: PicklingError: Could not serialize object: TypeError: can't pickle CompiledFFI objects

I'm new to the PySpark environment and came across an error while trying to encrypt data in an RDD with the cryptography module. Here's the code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('encrypt').getOrCreate()

df = spark.read.csv('test.csv', inferSchema = True, header = True)
df.show()
df.printSchema()

from cryptography.fernet import Fernet
key = Fernet.generate_key()
f = Fernet(key)

dfRDD = df.rdd
print(dfRDD)
mappedRDD = dfRDD.map(lambda value: (value[0], str(f.encrypt(str.encode(value[1]))), value[2] * 100))
data = mappedRDD.toDF()
data.show()

Everything works fine of course until I try mapping the value[1] with str(f.encrypt(str.encode(value[1]))). I receive the following error:

PicklingError: Could not serialize object: TypeError: can't pickle CompiledFFI objects

I have not seen too many resources referring to this error and wanted to see if anyone else has encountered it (or if via PySpark you have a recommended approach to column encryption).

like image 633
Byrdann Fox Avatar asked Aug 21 '17 04:08

Byrdann Fox


1 Answers

recommended approach to column encryption

You may consider Hive built-in encryption (HIVE-5207, HIVE-6329) but it is fairly limited at this moment (HIVE-7934).

Your current code doesn't work because Fernet objects are not serializable. You can make it work by distributing only keys:

def f(value, key=key): 
    return value[0], str(Fernet(key).encrypt(str.encode(value[1]))), value[2] * 100

mappedRDD = dfRDD.map(f)

or

def g(values, key=key):
    f = Fernet(key)
    for value in values:
        yield value[0], str(f.encrypt(str.encode(value[1]))), value[2] * 100

mappedRDD = dfRDD.mapPartitions(g)
like image 176
Alper t. Turker Avatar answered Sep 18 '22 15:09

Alper t. Turker