Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does spark-shell print thousands lines of code after count on DataFrame with 3000 columns? What's JaninoRuntimeException and 64 KB?

(With spark-2.1.0-bin-hadoop2.7 version from the official website on local machine)

When I executed a simple spark command in spark-shell, it starts to print out thousands and thousands lines of code before throwing an error. What are these "code"?

I was running spark on my local machine. The command I ran was a simple df.count where df is a DataFrame.

Please see a screenshot below (the codes fly by so fast I could only take screenshots to see what's going on). More details are below the image. enter image description here

More details:

I created the data frame df by

val df: DataFrame = spark.createDataFrame(rows, schema)
// rows: RDD[Row]
// schema: StructType
// There were about 3000 columns and 700 rows (testing set) of data in df. 
// The following line ran successfully and returned the correct value
rows.count
// The following line threw exception after printing out tons of codes as shown in the screenshot above
df.count

The exception thrown after the "codes" is:

...
/* 181897 */     apply_81(i);
/* 181898 */     result.setTotalSize(holder.totalSize());
/* 181899 */     return result;
/* 181900 */   }
/* 181901 */ }

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959) 

Edit: As @TzachZohar pointed out, this looks like one of the known bugs (https://issues.apache.org/jira/browse/SPARK-16845) that was fixed but not released from the spark project.

I pulled the spark master, built it from the source, and retried my example. Now I got a new exception following the generated code:

/* 308608 */     apply_1560(i);
/* 308609 */     apply_1561(i);
/* 308610 */     result.setTotalSize(holder.totalSize());
/* 308611 */     return result;
/* 308612 */   }
/* 308613 */ }

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:998)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:995)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF
at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499)

It looks like a pull request is addressing the second problem: https://github.com/apache/spark/pull/16648

like image 531
Random Certainty Avatar asked Nov 07 '22 23:11

Random Certainty


1 Answers

This is a bug. It is related to runtime code being generated on the JVM. So it seems to be hard for the Scala team to resolve. (There is much discussion on JIRA).

The error occurred with me when doing row operations. Even df.head() on a dataframe of 700 rows would cause the Exception.

The workaround for me was to convert the dataframe to a sparse data RDD (i.e., RDD[LabeledPoint]) and run rowwise operations on the RDD. It's much faster and more memory efficient. HOwever, it only works with numeric data. Categorical variables (factors, target etc) need to be converted to Double.

That said, I am new to Scala myself, so I my code is probably a tad amateurish. But it works.

CreateRow

@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label: Int): LabeledPoint =
{
  try
  {
    logger.info(s"fieldNameSeq $fieldNameSeq")
    val values: Map[String, Long] = rowIn.getValuesMap(fieldNameSeq)

    val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)

    //println(s"convertRowToLabeledPoint row values ${sortedValuesMap}")
    print(".")

    val rowValuesItr: Iterable[Long] = sortedValuesMap.values

    var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
    var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
    var currentPosition: Int = 0


    rowValuesItr.foreach
    {
      kv =>
        if (kv > 0)
        {
          valuesArray += kv.toDouble;
          positionsArray += currentPosition;
        }
        currentPosition = currentPosition + 1;
    }

    new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size, positionsArray.toArray, valuesArray.toArray))
  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}

private def castColumnTo(df: DataFrame, cn: String, tpe: DataType): DataFrame =
{

  //println("castColumnTo")
  df.withColumn(cn, df(cn).cast(tpe)

  )
}

Provide a Dataframe and return RDD LabeledPOint

@throws(classOf[Exception])
 def convertToLibSvm(spark:SparkSession,mDF : DataFrame, targetColumnName:String): RDD[LabeledPoint] =
{
  try
  {


    val fieldSeq: scala.collection.Seq[StructField] = mDF.schema.fields.toSeq.filter(f => f.dataType == IntegerType || f.dataType == LongType)
    val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)


    val indexer = new StringIndexer()
      .setInputCol(targetColumnName)
      .setOutputCol(targetColumnName+"_Indexed")
    val mDFTypedIndexed = indexer.fit(mDF).transform(mDF).drop(targetColumnName)
    val mDFFinal = castColumnTo(mDFTypedIndexed, targetColumnName+"_Indexed", IntegerType)

    //mDFFinal.show()
    //only doubles accepted by sparse vector, so that's what we filter for


    var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()

    mDFFinal.collect().foreach
    {

      row => positionsArray += convertRowToLabeledPoint(row, fieldNameSeq, row.getAs(targetColumnName+"_Indexed"));

    }

    spark.sparkContext.parallelize(positionsArray.toSeq)

  }
  catch
  {
    case ex: Exception =>
    {
      throw new Exception(ex)
    }
  }
}
like image 172
Jake Avatar answered Nov 14 '22 22:11

Jake