Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark __getnewargs__ error

I am trying to clean a Spark DataFrame by mapping it to RDD then back to DataFrame. Here's a toy example:

def replace_values(row,sub_rules):
    d = row.asDict()
    for col,old_val,new_val in sub_rules:
        if d[col] == old_val:
            d[col] = new_val      
    return Row(**d)
ex = sc.parallelize([{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 2}])
ex = sqlContext.createDataFrame(ex)
(ex.map(lambda row: replace_values(row,[(col,1,3) for col in ex.columns]))
    .toDF(schema=ex.schema))

Running the code above results in a Py4JError with a very long stack trace ending in the following:

Py4JError: An error occurred while calling o801.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

What is going on here? How do I fix it? I am using PySpark 1.5.2.

like image 705
Paul Avatar asked Dec 23 '15 20:12

Paul


1 Answers

The error is caused by the reference to ex.columns in the .map(lambda...) statement. You can't have references to an RDD inside the function being used in an RDD transformation. Spark is supposed to issue more helpful errors in this case, but apparently that didn't make it into this version.

Solution is to replace references with copies of the referenced variables:

def replace_values(row,sub_rules):
    d = row.asDict()
    for col,old_val,new_val in sub_rules:
        if d[col] == old_val:
            d[col] = new_val      
    return Row(**d)
ex = sc.parallelize([{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 2}])
ex = sqlContext.createDataFrame(ex)
cols = copy.deepcopy(ex.columns)
(ex.map(lambda row: replace_values(row,[(col,1,3) for col in cols]))
    .toDF(schema=ex.schema))
like image 192
Paul Avatar answered Sep 27 '22 20:09

Paul