Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

java.lang.NoClassDefFoundError: Could not initialize class when launching spark job via spark-submit in scala code

I have a code which looks like below

 object ErrorTest {
case class APIResults(status:String, col_1:Long, col_2:Double, ...)

def funcA(rows:ArrayBuffer[Row])(implicit defaultFormats:DefaultFormats):ArrayBuffer[APIResults] = {
  //call some API ang get results and return APIResults
  ...
}

// MARK: load properties
val props = loadProperties()
private def loadProperties(): Properties =  {
  val configFile = new File("config.properties")
  val reader = new FileReader(configFile)
  val props = new Properties()
  props.load(reader)
  props
}

def main(args: Array[String]): Unit = {
  val prop_a = props.getProperty("prop_a")

  val session = Context.initialSparkSession();
  import session.implicits._

  val initialSet = ArrayBuffer.empty[Row]
  val addToSet = (s: ArrayBuffer[Row], v: Row) => (s += v)
  val mergePartitionSets = (p1: ArrayBuffer[Row], p2: ArrayBuffer[Row]) => (p1 ++= p2)

  val sql1 =
    s"""
       select * from tbl_a where ...
     """

  session.sql(sql1)
    .rdd.map{row => {implicit val formats = DefaultFormats; (row.getLong(6), row)}}
    .aggregateByKey(initialSet)(addToSet,mergePartitionSets)
    .repartition(40)
    .map{case (rowNumber,rows) => {implicit val formats = DefaultFormats; funcA(rows)}}
    .flatMap(x => x)
    .toDF()
    .write.mode(SaveMode.Overwrite).saveAsTable("tbl_b")
  }
}

when I run it via spark-submit, it throws error Caused by: java.lang.NoClassDefFoundError: Could not initialize class staging_jobs.ErrorTest$. But if I move val props = loadProperties() into the first line of main method, then there's no error anymore. Could anyone give me a explanation on this phenomenon? Thanks a lot!

Caused by: java.lang.NoClassDefFoundError: Could not initialize class staging_jobs.ErrorTest$
  at staging_jobs.ErrorTest$$anonfun$main$1.apply(ErrorTest.scala:208)
  at staging_jobs.ErrorTest$$anonfun$main$1.apply(ErrorTest.scala:208)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
  ... 8 more
like image 562
KAs Avatar asked Aug 22 '17 18:08

KAs


2 Answers

I guess the problem is that val props = loadProperties() defines a member for the outer class (of main). Then this member will be serialized (or run) on the executors, which do not have the save environment with the driver.

like image 191
Morrissss Avatar answered Nov 17 '22 02:11

Morrissss


I've met the same question as you. I defined a method convert outside main method. When I use it with dataframe.rdd.map{x => convert(x)} in main , NoClassDefFoundError:Could not initialize class Test$ happened.

But when I use a function object convertor, which is the same code with convert method, in main method, no error happened.

I used spark 2.1.0, scala 2.11, it seems like a bug in spark?

like image 33
cnstevenyu Avatar answered Nov 17 '22 02:11

cnstevenyu