CodeGen grows beyond 64 KB error when normalizing large PySpark dataframe

I have a PySpark dataframe with 13 million rows and 800 columns. I need to normalize this data so have been using this code, which works with a smaller development data set.

def z_score_w(col, w):
    avg_ = avg(col).over(w)
    stddev_ = stddev_pop(col).over(w)
    return (col - avg_) / stddev_

w = Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    
norm_exprs = [z_score_w(signalsDF[x], w).alias(x) for x in signalsDF.columns]

normDF = signalsDF.select(norm_exprs)

However, when using the full data set I run into an exception with the codegen:

        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        ... 44 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.
spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
        at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
        at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
        at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
        at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:8933)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4346)
        at org.codehaus.janino.UnitCompiler.access$7100(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$10.visitBooleanLiteral(UnitCompiler.java:3267)

There are a few Spark JIRA issues around that appear similar, but these are all marked resolved. There is also this SO question which is relevant, but the answer is an alternative technique.

I have my own workaround where I normalize batches of columns of the dataframe. This works, but I end up with multiple dataframes that I then have to join, which is slow.

So, my question is - is there an alternative technique for normalizing large dataframes that I'm missing?

I'm using spark-2.0.1.

2 Answers

One obvious problem is the way you use window functions. The following frame:

Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    

is a bit useless in practice. Without partition column it reshuffles all data to a single partition first. This method of scaling is useful only to perform scaling in groups.

Spark provides two classes which can be used to scale features:

  • pyspark.ml.feature.StandardScaler
  • pyspark.mllib.feature.StandardScaler

Unfortunately both require Vector data as an input. With ML

from pyspark.ml.feature import StandardScaler as MLScaler, VectorAssembler
from pyspark.ml import Pipeline

scaled = Pipeline(stages=[
    VectorAssembler(inputCols=df.columns, outputCol="features"), 
    MLScaler(withMean=True, inputCol="features", outputCol="scaled")

This require further expanding of the scaled column if you need the original shape.

With MLlib:

from pyspark.mllib.feature import StandardScaler as MLLibScaler
from pyspark.mllib.linalg import DenseVector

rdd = df.rdd.map(DenseVector)
scaler = MLLibScaler(withMean=True, withStd=True)

scaler.fit(rdd).transform(rdd).map(lambda v: v.array.tolist()).toDF(df.columns)

The latter method can be more useful if there is a codegen issues related to the number of columns.

Another way you can approach this problem to compute global statistics

from pyspark.sql.functions import avg, col, stddev_pop, struct

stats = df.agg(*[struct(avg(c), stddev_pop(c)) for c in df.columns]).first()

and select:

    ((col(c) - mean) / std).alias(c)
    for (c, (mean, std)) in zip(df.columns, stats)

Following your comments the simplest solution you can think can be expressed using NumPy and a few basic transformations:

rdd = df.rdd.map(np.array)  # Convert to RDD of NumPy vectors
stats = rdd.stats()  # Compute mean and std
scaled = rdd.map(lambda v: (v - stats.mean()) / stats.stdev())  # Normalize

and converted back to DataFrame:

scaled.map(lambda x: x.tolist()).toDF(df.columns)
Please see this link, we resolved this error by adding checkpoints in code.

Checkpoint is simply write the data or dataframe back to disk and read it back.


Details on checkpoint


Q: What kind of RDD needs checkpoint ?

the computation takes a long time
the computing chain is too long
depends too many RDDs

Actually, saving the output of ShuffleMapTask on local disk is also checkpoint, but it is just for data output of partition.

Q: When to checkpoint ?

As mentioned above, every time a computed partition needs to be cached, it is cached into memory. However, checkpoint does not follow the same principle. Instead, it waits until the end of a job, and launches another job to finish checkpoint. An RDD which needs to be checkpointed will be computed twice; thus it is suggested to do a rdd.cache() before rdd.checkpoint(). In this case, the second job will not recompute the RDD. Instead, it will just read cache. In fact, Spark offers rdd.persist(StorageLevel.DISK_ONLY) method, like caching on disk. Thus, it caches RDD on disk during its first computation, but this kind of persist and checkpoint are different, we will discuss the difference later.

Q: How to implement checkpoint ?

Here is the procedure:

RDD will be: [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]. In the end, it will be checkpointed.

Similalry for dataframe: Write the dataframe to disk or s3 and read the data back in a new dataframe.


On driver side, after rdd.checkpoint() is called, the RDD will be managed by RDDCheckpointData. User should set the storage path for check point (on hdfs).

marked for checkpointing

After initialization, RDDCheckpointData will mark RDD MarkedForCheckpoint.

checkpointing in progress

When a job is finished, finalRdd.doCheckpoint() will be called. finalRDD scans the computing chain backward. When meeting an RDD which needs to be checkpointed, the RDD will be marked CheckpointingInProgress, and then the configuration files (for writing to hdfs), like core-site.xml, will be broadcast to blockManager of the other work nodes. After that, a job will be launched to finish checkpoint:

  rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString,  broadcastedConf))


After the job finishes checkpoint, it will clean all the dependencies of the RDD and set the RDD to checkpointed. Then, add a supplementary dependency and set the parent RDD as CheckpointRDD. The checkpointRDD will be used in the future to read checkpoint files from file system and then generate RDD partitions

What's interesting is the following:

Two RDDs are checkpointed in driver program, but only the result (see code below) is successfully checkpointed. Not sure whether it is a bug or only that the downstream RDD will be intentionally checkpointed.

val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),
    (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
   val pairs1 = sc.parallelize(data1, 3)

   val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
   val pairs2 = sc.parallelize(data2, 2)


   val result = pairs1.join(pairs2)
