I wrote a spark streaming application built with sbt. It works perfectly fine locally, but after deploying on the cluster, it complains about a class I wrote which clearly in the fat jar (checked using jar tvf). The following is my project structure. XXX object is the one that spark complains about
src
`-- main
`-- scala
|-- packageName
| `-- XXX object
`-- mainMethodEntryObject
My submit command:
$SPARK_HOME/bin/spark-submit \
--class mainMethodEntryObject \
--master REST_URL\
--deploy-mode cluster \
hdfs:///FAT_JAR_PRODUCED_BY_SBT_ASSEMBLY
Specific error message:
java.lang.NoClassDefFoundError: Could not initialize class XXX
You can submit a Spark batch application by using cluster mode (default) or client mode either inside the cluster or from an external client: Cluster mode (default): Submitting Spark batch application and having the driver run on a host in your driver resource group. The spark-submit syntax is --deploy-mode cluster.
Just check http://master:8088 where master is pointing to spark master machine. There you will be able to see spark master URI, and by default is spark://master:7077, actually quite a bit of information lives there, if you have a spark standalone cluster.
So, how do you run the spark in local mode? It is very simple. When we do not specify any --master flag to the command spark-shell, pyspark, spark-submit, or any other binary, it is running in local mode. Or we can specify --master option with local as argument which defaults to 1 thread.
What happens when a Spark Job is submitted? When a client submits a spark user application code, the driver implicitly converts the code containing transformations and actions into a logical directed acyclic graph (DAG).
I ran into this issue for a reason similar to this user: http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-NoClassDefFoundError-is-this-a-bug-td18972.html
I was calling a method on an object that had a few variables defined on the object itself, including spark and a logger, like this
val spark = SparkSession
.builder()
.getOrCreate()
val logger = LoggerFactory.getLogger(this.getClass.getName)
The function I was calling called another function on the object, which called another function, which called yet another function on the object inside of a flatMap
call on an rdd.
I was getting the NoClassDefFoundError
error in a stacktrace where the previous 2 function calls in the stack trace were functions on the class Spark was telling me did not exist.
Based on the conversation linked above, my hypothesis was that the global spark
reference wasn't getting initialized by the time the function that used it was getting called (the one that resulted in the NoClassDefFoundError
exception).
After quite a few experiments, I found that this pattern worked to resolve the problem.
// Move global definitions here
object MyClassGlobalDef {
val spark = SparkSession
.builder()
.getOrCreate()
val logger = LoggerFactory.getLogger(this.getClass.getName)
}
// Force the globals object to be initialized
import MyClassGlobalDef._
object MyClass {
// Functions here
}
It's kind of ugly, but Spark seems to like it.
It's difficult to say without the code but it looks like a problem of serialization of your XXX object. I can't say I'm understand perfectly why, but the point is that the object is not shipped to the executor.
The solution that worked for me is to convert your object to a class that extends Serializable
and just instantiate it where you need it. So basically, if I'm not wrong you have
object test {
def foo = ...
}
which would be used as test.foo
in your main, but you need at minimum
class Test extends Serializable {
def foo = ...
}
and then in your main have val test = new Test
at the beginning and that's it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With