(With spark-2.1.0-bin-hadoop2.7 version from the official website on local machine)
When I executed a simple spark command in spark-shell, it starts to print out thousands and thousands lines of code before throwing an error. What are these "code"?
I was running spark on my local machine. The command I ran was a simple df.count
where df
is a DataFrame.
Please see a screenshot below (the codes fly by so fast I could only take screenshots to see what's going on). More details are below the image.
More details:
I created the data frame df
by
val df: DataFrame = spark.createDataFrame(rows, schema)
// rows: RDD[Row]
// schema: StructType
// There were about 3000 columns and 700 rows (testing set) of data in df.
// The following line ran successfully and returned the correct value
rows.count
// The following line threw exception after printing out tons of codes as shown in the screenshot above
df.count
The exception thrown after the "codes" is:
...
/* 181897 */ apply_81(i);
/* 181898 */ result.setTotalSize(holder.totalSize());
/* 181899 */ return result;
/* 181900 */ }
/* 181901 */ }
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:889)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:938)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959)
Edit: As @TzachZohar pointed out, this looks like one of the known bugs (https://issues.apache.org/jira/browse/SPARK-16845) that was fixed but not released from the spark project.
I pulled the spark master, built it from the source, and retried my example. Now I got a new exception following the generated code:
/* 308608 */ apply_1560(i);
/* 308609 */ apply_1561(i);
/* 308610 */ result.setTotalSize(holder.totalSize());
/* 308611 */ return result;
/* 308612 */ }
/* 308613 */ }
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:941)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:998)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:995)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
... 29 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Constant pool for class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection has grown past JVM limit of 0xFFFF
at org.codehaus.janino.util.ClassFile.addToConstantPool(ClassFile.java:499)
It looks like a pull request is addressing the second problem: https://github.com/apache/spark/pull/16648
This is a bug. It is related to runtime code being generated on the JVM. So it seems to be hard for the Scala team to resolve. (There is much discussion on JIRA).
The error occurred with me when doing row operations. Even df.head() on a dataframe of 700 rows would cause the Exception.
The workaround for me was to convert the dataframe to a sparse data RDD (i.e., RDD[LabeledPoint]) and run rowwise operations on the RDD. It's much faster and more memory efficient. HOwever, it only works with numeric data. Categorical variables (factors, target etc) need to be converted to Double.
That said, I am new to Scala myself, so I my code is probably a tad amateurish. But it works.
CreateRow
@throws(classOf[Exception])
private def convertRowToLabeledPoint(rowIn: Row, fieldNameSeq: Seq[String], label: Int): LabeledPoint =
{
try
{
logger.info(s"fieldNameSeq $fieldNameSeq")
val values: Map[String, Long] = rowIn.getValuesMap(fieldNameSeq)
val sortedValuesMap = ListMap(values.toSeq.sortBy(_._1): _*)
//println(s"convertRowToLabeledPoint row values ${sortedValuesMap}")
print(".")
val rowValuesItr: Iterable[Long] = sortedValuesMap.values
var positionsArray: ArrayBuffer[Int] = ArrayBuffer[Int]()
var valuesArray: ArrayBuffer[Double] = ArrayBuffer[Double]()
var currentPosition: Int = 0
rowValuesItr.foreach
{
kv =>
if (kv > 0)
{
valuesArray += kv.toDouble;
positionsArray += currentPosition;
}
currentPosition = currentPosition + 1;
}
new LabeledPoint(label, org.apache.spark.mllib.linalg.Vectors.sparse(positionsArray.size, positionsArray.toArray, valuesArray.toArray))
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
private def castColumnTo(df: DataFrame, cn: String, tpe: DataType): DataFrame =
{
//println("castColumnTo")
df.withColumn(cn, df(cn).cast(tpe)
)
}
Provide a Dataframe and return RDD LabeledPOint
@throws(classOf[Exception])
def convertToLibSvm(spark:SparkSession,mDF : DataFrame, targetColumnName:String): RDD[LabeledPoint] =
{
try
{
val fieldSeq: scala.collection.Seq[StructField] = mDF.schema.fields.toSeq.filter(f => f.dataType == IntegerType || f.dataType == LongType)
val fieldNameSeq: Seq[String] = fieldSeq.map(f => f.name)
val indexer = new StringIndexer()
.setInputCol(targetColumnName)
.setOutputCol(targetColumnName+"_Indexed")
val mDFTypedIndexed = indexer.fit(mDF).transform(mDF).drop(targetColumnName)
val mDFFinal = castColumnTo(mDFTypedIndexed, targetColumnName+"_Indexed", IntegerType)
//mDFFinal.show()
//only doubles accepted by sparse vector, so that's what we filter for
var positionsArray: ArrayBuffer[LabeledPoint] = ArrayBuffer[LabeledPoint]()
mDFFinal.collect().foreach
{
row => positionsArray += convertRowToLabeledPoint(row, fieldNameSeq, row.getAs(targetColumnName+"_Indexed"));
}
spark.sparkContext.parallelize(positionsArray.toSeq)
}
catch
{
case ex: Exception =>
{
throw new Exception(ex)
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With